aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rwxr-xr-xbin/run-example13
-rwxr-xr-xbin/spark-shell11
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala142
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala44
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala167
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala68
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala41
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/partial/PartialResult.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala25
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala51
-rw-r--r--core/src/test/resources/spark.conf8
-rw-r--r--core/src/test/scala/org/apache/spark/JavaAPISuite.java17
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala126
-rw-r--r--docs/configuration.md41
-rw-r--r--docs/mllib-guide.md51
-rw-r--r--docs/running-on-yarn.md2
-rw-r--r--docs/scala-programming-guide.md4
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java4
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java1
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java1
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java1
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala59
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala1
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala1
-rw-r--r--graphx/pom.xml6
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala68
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala27
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala29
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala189
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala30
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala10
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala158
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala13
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala16
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala16
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala15
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala14
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala23
-rw-r--r--pom.xml2
-rw-r--r--project/SparkBuild.scala9
-rw-r--r--project/project/SparkPluginBuild.scala2
-rw-r--r--python/pyspark/conf.py10
-rw-r--r--python/pyspark/mllib/__init__.py10
-rw-r--r--python/pyspark/rdd.py25
-rw-r--r--python/pyspark/tests.py27
-rwxr-xr-xpython/run-tests10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala22
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala35
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala6
101 files changed, 1336 insertions, 723 deletions
diff --git a/.gitignore b/.gitignore
index 39635d7eef..3d17899212 100644
--- a/.gitignore
+++ b/.gitignore
@@ -44,4 +44,4 @@ derby.log
dist/
spark-*-bin.tar.gz
unit-tests.log
-lib/
+/lib/
diff --git a/bin/run-example b/bin/run-example
index 2e9d51440b..adba7dd97a 100755
--- a/bin/run-example
+++ b/bin/run-example
@@ -76,11 +76,20 @@ else
fi
fi
+# Set JAVA_OPTS to be able to load native libraries and to set heap size
+JAVA_OPTS="$SPARK_JAVA_OPTS"
+JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
+# Load extra JAVA_OPTS from conf/java-opts, if it exists
+if [ -e "$FWDIR/conf/java-opts" ] ; then
+ JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
+fi
+export JAVA_OPTS
+
if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
echo -n "Spark Command: "
- echo "$RUNNER" -cp "$CLASSPATH" "$@"
+ echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
echo "========================================"
echo
fi
-exec "$RUNNER" -cp "$CLASSPATH" "$@"
+exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
diff --git a/bin/spark-shell b/bin/spark-shell
index e6885b51ef..05a46ee0ca 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -45,13 +45,18 @@ for o in "$@"; do
done
# Set MASTER from spark-env if possible
+DEFAULT_SPARK_MASTER_PORT=7077
if [ -z "$MASTER" ]; then
if [ -e "$FWDIR/conf/spark-env.sh" ]; then
. "$FWDIR/conf/spark-env.sh"
fi
- if [[ "x" != "x$SPARK_MASTER_IP" && "y" != "y$SPARK_MASTER_PORT" ]]; then
- MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
- export MASTER
+ if [ "x" != "x$SPARK_MASTER_IP" ]; then
+ if [ "y" != "y$SPARK_MASTER_PORT" ]; then
+ SPARK_MASTER_PORT="${SPARK_MASTER_PORT}"
+ else
+ SPARK_MASTER_PORT=$DEFAULT_SPARK_MASTER_PORT
+ fi
+ export MASTER="spark://${SPARK_MASTER_IP}:${SPARK_MASTER_PORT}"
fi
fi
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index fc0a749882..3081f927cc 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -49,7 +49,7 @@ object Partitioner {
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
- for (r <- bySize if r.partitioner != None) {
+ for (r <- bySize if r.partitioner.isDefined) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 369c6ce78f..45d19bcbfa 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -20,19 +20,17 @@ package org.apache.spark
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
-import com.typesafe.config.ConfigFactory
import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
*
* Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
- * values from both the `spark.*` Java system properties and any `spark.conf` on your application's
- * classpath (if it has one). In this case, system properties take priority over `spark.conf`, and
- * any parameters you set directly on the `SparkConf` object take priority over both of those.
+ * values from any `spark.*` Java system properties set in your application as well. In this case,
+ * parameters you set directly on the `SparkConf` object take priority over system properties.
*
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
- * get the same configuration no matter what is on the classpath.
+ * get the same configuration no matter what the system properties are.
*
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
@@ -40,7 +38,7 @@ import java.io.{ObjectInputStream, ObjectOutputStream, IOException}
* Note that once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*
- * @param loadDefaults whether to load values from the system properties and classpath
+ * @param loadDefaults whether to also load values from Java system properties
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
@@ -50,11 +48,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
private val settings = new HashMap[String, String]()
if (loadDefaults) {
- ConfigFactory.invalidateCaches()
- val typesafeConfig = ConfigFactory.systemProperties()
- .withFallback(ConfigFactory.parseResources("spark.conf"))
- for (e <- typesafeConfig.entrySet().asScala if e.getKey.startsWith("spark.")) {
- settings(e.getKey) = e.getValue.unwrapped.toString
+ // Load any spark.* system properties
+ for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
+ settings(k) = v
}
}
@@ -196,7 +192,15 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
/** Get all akka conf variables set on this SparkConf */
- def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")}
+ def getAkkaConf: Seq[(String, String)] =
+ /* This is currently undocumented. If we want to make this public we should consider
+ * nesting options under the spark namespace to avoid conflicts with user akka options.
+ * Otherwise users configuring their own akka code via system properties could mess up
+ * spark's akka options.
+ *
+ * E.g. spark.akka.option.x.y.x = "value"
+ */
+ getAll.filter {case (k, v) => k.startsWith("akka.")}
/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ba3e91effb..566472e597 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -341,7 +341,7 @@ class SparkContext(
*/
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
- minSplits, cloneRecords = false).map(pair => pair._2.toString)
+ minSplits).map(pair => pair._2.toString)
}
/**
@@ -354,33 +354,37 @@ class SparkContext(
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
- def hadoopRDD[K: ClassTag, V: ClassTag](
+ def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits,
- cloneRecords: Boolean = true
+ minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// Add necessary security credentials to the JobConf before broadcasting it.
SparkHadoopUtil.get.addCredentials(conf)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
+ new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K: ClassTag, V: ClassTag](
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ * */
+ def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int = defaultMinSplits,
- cloneRecords: Boolean = true
+ minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
@@ -392,8 +396,7 @@ class SparkContext(
inputFormatClass,
keyClass,
valueClass,
- minSplits,
- cloneRecords)
+ minSplits)
}
/**
@@ -403,16 +406,20 @@ class SparkContext(
* {{{
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
* }}}
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
- (path: String, minSplits: Int, cloneRecords: Boolean = true)
+ (path: String, minSplits: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
hadoopFile(path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
vm.runtimeClass.asInstanceOf[Class[V]],
- minSplits,
- cloneRecords)
+ minSplits)
}
/**
@@ -422,68 +429,91 @@ class SparkContext(
* {{{
* val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
* }}}
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, cloneRecords: Boolean = true)
+ def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits, cloneRecords)
+ hadoopFile[K, V, F](path, defaultMinSplits)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
- (path: String, cloneRecords: Boolean = true)
+ (path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
fm.runtimeClass.asInstanceOf[Class[F]],
km.runtimeClass.asInstanceOf[Class[K]],
- vm.runtimeClass.asInstanceOf[Class[V]],
- cloneRecords = cloneRecords)
+ vm.runtimeClass.asInstanceOf[Class[V]])
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
- def newAPIHadoopFile[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
+ def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
- conf: Configuration = hadoopConfiguration,
- cloneRecords: Boolean = true): RDD[(K, V)] = {
+ conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
val job = new NewHadoopJob(conf)
NewFileInputFormat.addInputPath(job, new Path(path))
val updatedConf = job.getConfiguration
- new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf, cloneRecords)
+ new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
}
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
- def newAPIHadoopRDD[K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]](
+ def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
kClass: Class[K],
- vClass: Class[V],
- cloneRecords: Boolean = true): RDD[(K, V)] = {
- new NewHadoopRDD(this, fClass, kClass, vClass, conf, cloneRecords)
- }
-
- /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K: ClassTag, V: ClassTag](path: String,
+ vClass: Class[V]): RDD[(K, V)] = {
+ new NewHadoopRDD(this, fClass, kClass, vClass, conf)
+ }
+
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
+ def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int,
- cloneRecords: Boolean = true
+ minSplits: Int
): RDD[(K, V)] = {
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits, cloneRecords)
+ hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
}
- /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K: ClassTag, V: ClassTag](path: String, keyClass: Class[K], valueClass: Class[V],
- cloneRecords: Boolean = true): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits, cloneRecords)
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ * */
+ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]
+ ): RDD[(K, V)] =
+ sequenceFile(path, keyClass, valueClass, defaultMinSplits)
/**
* Version of sequenceFile() for types implicitly convertible to Writables through a
@@ -500,9 +530,14 @@ class SparkContext(
* have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
def sequenceFile[K, V]
- (path: String, minSplits: Int = defaultMinSplits, cloneRecords: Boolean = true)
+ (path: String, minSplits: Int = defaultMinSplits)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
@@ -511,7 +546,7 @@ class SparkContext(
val format = classOf[SequenceFileInputFormat[Writable, Writable]]
val writables = hadoopFile(path, format,
kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits, cloneRecords)
+ vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) }
}
@@ -708,8 +743,11 @@ class SparkContext(
env.httpFileServer.addJar(new File(fileName))
} catch {
case e: Exception => {
+ // For now just log an error but allow to go through so spark examples work.
+ // The spark examples don't really need the jar distributed since its also
+ // the app jar.
logError("Error adding jar (" + e + "), was the --addJars option used?")
- throw e
+ null
}
}
} else {
@@ -722,8 +760,10 @@ class SparkContext(
path
}
}
- addedJars(key) = System.currentTimeMillis
- logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
+ if (key != null) {
+ addedJars(key) = System.currentTimeMillis
+ logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
+ }
}
}
@@ -956,6 +996,8 @@ class SparkContext(
}
}
+ def getCheckpointDir = checkpointDir
+
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = taskScheduler.defaultParallelism
@@ -1017,7 +1059,7 @@ object SparkContext {
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
- rdd: RDD[(K, V)]) =
+ rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
@@ -1125,7 +1167,7 @@ object SparkContext {
if (sparkHome != null) {
res.setSparkHome(sparkHome)
}
- if (!jars.isEmpty) {
+ if (jars != null && !jars.isEmpty) {
res.setJars(jars)
}
res.setExecutorEnv(environment.toSeq)
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 0fb7e195b3..f430a33db1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -49,8 +49,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
- override val classTag: ClassTag[(K, V)] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+ override val classTag: ClassTag[(K, V)] = rdd.elementClassTag
import JavaPairRDD._
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index ebbbbd8806..4db7339e67 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -88,7 +88,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -119,7 +119,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@@ -134,13 +134,21 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue()))
}
/**
- * Return a new RDD by applying a function to each partition of this RDD.
+ * Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]):
JavaPairRDD[K2, V2] = {
@@ -148,6 +156,31 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType())
}
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning).map((x: java.lang.Double) => x.doubleValue()))
+ }
+
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
+ def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean):
+ JavaPairRDD[K2, V2] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Applies a function f to each partition of this RDD.
+ */
+ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) {
+ rdd.foreachPartition((x => f(asJavaIterator(x))))
+ }
+
/**
* Return an RDD created by coalescing all elements within each partition into an array.
*/
@@ -461,4 +494,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
def name(): String = rdd.name
+
+ /** Reset generator */
+ def setGenerator(_generator: String) = {
+ rdd.setGenerator(_generator)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 4b73a0352f..5a426b9835 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -137,7 +137,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
*/
def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits)
- /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
+ /** Get an RDD for a Hadoop SequenceFile with given key and value types.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ * */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
@@ -148,19 +154,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
- /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String,
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int,
- cloneRecords: Boolean
- ): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits, cloneRecords))
- }
-
- /** Get an RDD for a Hadoop SequenceFile. */
+ /** Get an RDD for a Hadoop SequenceFile.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
@@ -168,15 +168,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
}
- /** Get an RDD for a Hadoop SequenceFile. */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V],
- cloneRecords: Boolean):
- JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, cloneRecords))
- }
-
/**
* Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
* BytesWritable values that contain a serialized partition. This is still an experimental storage
@@ -205,6 +196,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
* etc).
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
conf: JobConf,
@@ -218,41 +214,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
-
- /**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other
- * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),
- * using the older MapReduce API (`org.apache.hadoop.mapred`).
- *
- * @param conf JobConf for setting up the dataset
- * @param inputFormatClass Class of the [[InputFormat]]
- * @param keyClass Class of the keys
- * @param valueClass Class of the values
- * @param minSplits Minimum number of Hadoop Splits to generate.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
- */
- def hadoopRDD[K, V, F <: InputFormat[K, V]](
- conf: JobConf,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int,
- cloneRecords: Boolean
- ): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits,
- cloneRecords))
- }
-
/**
* Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
* other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
def hadoopRDD[K, V, F <: InputFormat[K, V]](
conf: JobConf,
@@ -265,7 +234,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
@@ -278,22 +253,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V, F <: InputFormat[K, V]](
- path: String,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int,
- cloneRecords: Boolean
- ): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass,
- minSplits, cloneRecords))
- }
-
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
@@ -306,23 +272,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
inputFormatClass, keyClass, valueClass))
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V, F <: InputFormat[K, V]](
- path: String,
- inputFormatClass: Class[F],
- keyClass: Class[K],
- valueClass: Class[V],
- cloneRecords: Boolean
- ): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(keyClass)
- implicit val vcm: ClassTag[V] = ClassTag(valueClass)
- new JavaPairRDD(sc.hadoopFile(path,
- inputFormatClass, keyClass, valueClass, cloneRecords = cloneRecords))
- }
-
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
@@ -338,22 +295,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/**
* Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
* and extra configuration options to pass to the input format.
- */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
- path: String,
- fClass: Class[F],
- kClass: Class[K],
- vClass: Class[V],
- conf: Configuration,
- cloneRecords: Boolean): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(kClass)
- implicit val vcm: ClassTag[V] = ClassTag(vClass)
- new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf, cloneRecords))
- }
-
- /**
- * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
- * and extra configuration options to pass to the input format.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration,
@@ -365,21 +311,6 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
- /**
- * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
- * and extra configuration options to pass to the input format.
- */
- def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
- conf: Configuration,
- fClass: Class[F],
- kClass: Class[K],
- vClass: Class[V],
- cloneRecords: Boolean): JavaPairRDD[K, V] = {
- implicit val kcm: ClassTag[K] = ClassTag(kClass)
- implicit val vcm: ClassTag[V] = ClassTag(vClass)
- new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass, cloneRecords))
- }
-
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
@@ -513,6 +444,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.setCheckpointDir(dir)
}
+ def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
+
protected def checkpointFile[T](path: String): JavaRDD[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 82527fe663..70516bde8b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -52,6 +52,8 @@ private[spark] class PythonRDD[T: ClassTag](
val env = SparkEnv.get
val worker = env.createPythonWorker(pythonExec, envVars.toMap)
+ @volatile var readerException: Exception = null
+
// Start a thread to feed the process input from our parent's iterator
new Thread("stdin writer for " + pythonExec) {
override def run() {
@@ -78,12 +80,14 @@ private[spark] class PythonRDD[T: ClassTag](
dataOut.writeInt(command.length)
dataOut.write(command)
// Data values
- for (elem <- parent.iterator(split, context)) {
- PythonRDD.writeToStream(elem, dataOut)
- }
+ PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
dataOut.flush()
worker.shutdownOutput()
} catch {
+ case e: java.io.FileNotFoundException =>
+ readerException = e
+ // Kill the Python worker process:
+ worker.shutdownOutput()
case e: IOException =>
// This can happen for legitimate reasons if the Python code stops returning data before we are done
// passing elements through, e.g., for take(). Just log a message to say it happened.
@@ -108,6 +112,9 @@ private[spark] class PythonRDD[T: ClassTag](
}
private def read(): Array[Byte] = {
+ if (readerException != null) {
+ throw readerException
+ }
try {
stream.readInt() match {
case length if length > 0 =>
@@ -206,20 +213,43 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}
- def writeToStream(elem: Any, dataOut: DataOutputStream) {
- elem match {
- case bytes: Array[Byte] =>
- dataOut.writeInt(bytes.length)
- dataOut.write(bytes)
- case pair: (Array[Byte], Array[Byte]) =>
- dataOut.writeInt(pair._1.length)
- dataOut.write(pair._1)
- dataOut.writeInt(pair._2.length)
- dataOut.write(pair._2)
- case str: String =>
- dataOut.writeUTF(str)
- case other =>
- throw new SparkException("Unexpected element type " + other.getClass)
+ def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
+ // The right way to implement this would be to use TypeTags to get the full
+ // type of T. Since I don't want to introduce breaking changes throughout the
+ // entire Spark API, I have to use this hacky approach:
+ if (iter.hasNext) {
+ val first = iter.next()
+ val newIter = Seq(first).iterator ++ iter
+ first match {
+ case arr: Array[Byte] =>
+ newIter.asInstanceOf[Iterator[Array[Byte]]].foreach { bytes =>
+ dataOut.writeInt(bytes.length)
+ dataOut.write(bytes)
+ }
+ case string: String =>
+ newIter.asInstanceOf[Iterator[String]].foreach { str =>
+ dataOut.writeUTF(str)
+ }
+ case pair: Tuple2[_, _] =>
+ pair._1 match {
+ case bytePair: Array[Byte] =>
+ newIter.asInstanceOf[Iterator[Tuple2[Array[Byte], Array[Byte]]]].foreach { pair =>
+ dataOut.writeInt(pair._1.length)
+ dataOut.write(pair._1)
+ dataOut.writeInt(pair._2.length)
+ dataOut.write(pair._2)
+ }
+ case stringPair: String =>
+ newIter.asInstanceOf[Iterator[Tuple2[String, String]]].foreach { pair =>
+ dataOut.writeUTF(pair._1)
+ dataOut.writeUTF(pair._2)
+ }
+ case other =>
+ throw new SparkException("Unexpected Tuple2 element type " + pair._1.getClass)
+ }
+ case other =>
+ throw new SparkException("Unexpected element type " + first.getClass)
+ }
}
}
@@ -230,9 +260,7 @@ private[spark] object PythonRDD {
def writeToFile[T](items: Iterator[T], filename: String) {
val file = new DataOutputStream(new FileOutputStream(filename))
- for (item <- items) {
- writeToStream(item, file)
- }
+ writeIteratorToStream(items, file)
file.close()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index e38459b883..449b953530 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,7 +22,7 @@ private[spark] class ApplicationDescription(
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
- val sparkHome: String,
+ val sparkHome: Option[String],
val appUiUrl: String)
extends Serializable {
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 5e824e1a67..83ce14a0a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -79,8 +79,7 @@ private[deploy] object DeployMessages {
execId: Int,
appDesc: ApplicationDescription,
cores: Int,
- memory: Int,
- sparkHome: String)
+ memory: Int)
extends DeployMessage
case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 27dc42bf7e..b479225b45 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -28,7 +28,6 @@ import org.apache.spark.{SparkContext, SparkException}
/**
* Contains util methods to interact with Hadoop from Spark.
*/
-private[spark]
class SparkHadoopUtil {
val conf = newConfiguration()
UserGroupInformation.setConfiguration(conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ffa909c26b..8017932032 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,7 +49,7 @@ private[spark] object TestClient {
conf = new SparkConf)
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
- "dummy-spark-home", "ignored")
+ Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d9ea96afcf..d49401f2fe 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -480,7 +480,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
for (pos <- 0 until numUsable) {
if (assigned(pos) > 0) {
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
- launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
+ launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
@@ -493,7 +493,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec, app.desc.sparkHome)
+ launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
@@ -502,11 +502,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
- def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
- exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+ exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
@@ -515,7 +515,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
- (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
+ (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
}.foreach { w =>
workers -= w
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 9485bfd89e..f29a6ad2e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -67,7 +67,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
<li><strong>User:</strong> {app.desc.user}</li>
<li><strong>Cores:</strong>
{
- if (app.desc.maxCores == None) {
+ if (app.desc.maxCores.isEmpty) {
"Unlimited (%s granted)".format(app.coresGranted)
} else {
"%s (%s granted, %s left)".format(
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 312560d706..fbf2e0f30f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -76,7 +76,7 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
- var sparkHome: File = null
+ val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -120,7 +120,6 @@ private[spark] class Worker(
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
- sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
@@ -204,22 +203,34 @@ private[spark] class Worker(
System.exit(1)
}
- case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+ case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- // TODO (pwendell): We shuld make sparkHome an Option[String] in
- // ApplicationDescription to be more explicit about this.
- val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- masterLock.synchronized {
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ try {
+ logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host,
+ appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
+ workDir, akkaUrl, ExecutorState.RUNNING)
+ executors(appId + "/" + execId) = manager
+ manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
+ } catch {
+ case e: Exception => {
+ logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
+ if (executors.contains(appId + "/" + execId)) {
+ executors(appId + "/" + execId).kill()
+ executors -= appId + "/" + execId
+ }
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+ }
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
index 6f9f29969e..e54ac0b332 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
@@ -80,7 +80,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
val subProperties = new mutable.HashMap[String, Properties]
import scala.collection.JavaConversions._
prop.foreach { kv =>
- if (regex.findPrefixOf(kv._1) != None) {
+ if (regex.findPrefixOf(kv._1).isDefined) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
}
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
index d71069444a..423ff67a5f 100644
--- a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
@@ -71,7 +71,7 @@ private[spark] class ApproximateActionListener[T, U, R](
val finishTime = startTime + timeout
while (true) {
val time = System.currentTimeMillis()
- if (failure != None) {
+ if (failure.isDefined) {
throw failure.get
} else if (finishedTasks == totalTasks) {
return new PartialResult(evaluator.currentResult(), true)
diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
index 5ce49b8100..812368e04a 100644
--- a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
+++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
@@ -31,10 +31,10 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
* Blocking method to wait for and return the final value.
*/
def getFinalValue(): R = synchronized {
- while (finalValue == None && failure == None) {
+ while (finalValue.isEmpty && failure.isEmpty) {
this.wait()
}
- if (finalValue != None) {
+ if (finalValue.isDefined) {
return finalValue.get
} else {
throw failure.get
@@ -46,11 +46,11 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
* is supported per PartialResult.
*/
def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
- if (completionHandler != None) {
+ if (completionHandler.isDefined) {
throw new UnsupportedOperationException("onComplete cannot be called twice")
}
completionHandler = Some(handler)
- if (finalValue != None) {
+ if (finalValue.isDefined) {
// We already have a final value, so let's call the handler
handler(finalValue.get)
}
@@ -63,11 +63,11 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
*/
def onFail(handler: Exception => Unit) {
synchronized {
- if (failureHandler != None) {
+ if (failureHandler.isDefined) {
throw new UnsupportedOperationException("onFail cannot be called twice")
}
failureHandler = Some(handler)
- if (failure != None) {
+ if (failure.isDefined) {
// We already have a failure, so let's call the handler
handler(failure.get)
}
@@ -102,7 +102,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
private[spark] def setFinalValue(value: R) {
synchronized {
- if (finalValue != None) {
+ if (finalValue.isDefined) {
throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
}
finalValue = Some(value)
@@ -117,7 +117,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
private[spark] def setFailure(exception: Exception) {
synchronized {
- if (failure != None) {
+ if (failure.isDefined) {
throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
}
failure = Some(exception)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index 83109d1a6f..30e578dd93 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -43,8 +43,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
val numPartitions =
// listStatus can throw exception if path does not exist.
if (fs.exists(cpath)) {
- val dirContents = fs.listStatus(cpath)
- val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+ val dirContents = fs.listStatus(cpath).map(_.getPath)
+ val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted
val numPart = partitionFiles.size
if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index dbe76f3431..ad74d4636f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,10 +19,7 @@ package org.apache.spark.rdd
import java.io.EOFException
-import scala.reflect.ClassTag
-
import org.apache.hadoop.conf.{Configuration, Configurable}
-import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -34,7 +31,6 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.NextIterator
-import org.apache.spark.util.Utils.cloneWritables
/**
@@ -64,21 +60,15 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param minSplits Minimum number of Hadoop Splits (HadoopRDD partitions) to generate.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
*/
-class HadoopRDD[K: ClassTag, V: ClassTag](
+class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int,
- cloneRecords: Boolean = true)
+ minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
def this(
@@ -87,8 +77,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- minSplits: Int,
- cloneRecords: Boolean) = {
+ minSplits: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
@@ -97,8 +86,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
inputFormatClass,
keyClass,
valueClass,
- minSplits,
- cloneRecords)
+ minSplits)
}
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
@@ -170,9 +158,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
val key: K = reader.createKey()
- val keyCloneFunc = cloneWritables[K](jobConf)
val value: V = reader.createValue()
- val valueCloneFunc = cloneWritables[V](jobConf)
override def getNext() = {
try {
finished = !reader.next(key, value)
@@ -180,11 +166,7 @@ class HadoopRDD[K: ClassTag, V: ClassTag](
case eof: EOFException =>
finished = true
}
- if (cloneRecords) {
- (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
- } else {
- (key, value)
- }
+ (key, value)
}
override def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 992bd4aa0a..d1fff29687 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -20,15 +20,11 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
-import scala.reflect.ClassTag
-
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
import org.apache.spark.{InterruptibleIterator, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
-import org.apache.spark.util.Utils.cloneWritables
-
private[spark]
class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
@@ -48,19 +44,13 @@ class NewHadoopPartition(rddId: Int, val index: Int, @transient rawSplit: InputS
* @param keyClass Class of the key associated with the inputFormatClass.
* @param valueClass Class of the value associated with the inputFormatClass.
* @param conf The Hadoop configuration.
- * @param cloneRecords If true, Spark will clone the records produced by Hadoop RecordReader.
- * Most RecordReader implementations reuse wrapper objects across multiple
- * records, and can cause problems in RDD collect or aggregation operations.
- * By default the records are cloned in Spark. However, application
- * programmers can explicitly disable the cloning for better performance.
*/
-class NewHadoopRDD[K: ClassTag, V: ClassTag](
+class NewHadoopRDD[K, V](
sc : SparkContext,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
- @transient conf: Configuration,
- cloneRecords: Boolean)
+ @transient conf: Configuration)
extends RDD[(K, V)](sc, Nil)
with SparkHadoopMapReduceUtil
with Logging {
@@ -107,8 +97,6 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback(() => close())
- val keyCloneFunc = cloneWritables[K](conf)
- val valueCloneFunc = cloneWritables[V](conf)
var havePair = false
var finished = false
@@ -125,13 +113,7 @@ class NewHadoopRDD[K: ClassTag, V: ClassTag](
throw new java.util.NoSuchElementException("End of stream")
}
havePair = false
- val key = reader.getCurrentKey
- val value = reader.getCurrentValue
- if (cloneRecords) {
- (keyCloneFunc(key.asInstanceOf[Writable]), valueCloneFunc(value.asInstanceOf[Writable]))
- } else {
- (key, value)
- }
+ (reader.getCurrentKey, reader.getCurrentValue)
}
private def close() {
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index cd90a1561a..1472c92b60 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -666,7 +666,7 @@ abstract class RDD[T: ClassTag](
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
- if (taskResult != None) {
+ if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
case None => taskResult
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7046c06d20..237cbf4c0c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -877,7 +877,7 @@ class DAGScheduler(
logInfo("running: " + running)
logInfo("waiting: " + waiting)
logInfo("failed: " + failed)
- if (stage.shuffleDep != None) {
+ if (stage.shuffleDep.isDefined) {
// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index c60e9896de..520c0b29e3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -46,7 +46,7 @@ private[spark] class Stage(
callSite: Option[String])
extends Logging {
- val isShuffleMap = shuffleDep != None
+ val isShuffleMap = shuffleDep.isDefined
val numPartitions = rdd.partitions.size
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
var numAvailableOutputs = 0
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6cc608ea5b..83ba584015 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -293,7 +293,7 @@ private[spark] class TaskSchedulerImpl(
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
- if (failedExecutor != None) {
+ if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
@@ -387,7 +387,7 @@ private[spark] class TaskSchedulerImpl(
}
}
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
- if (failedExecutor != None) {
+ if (failedExecutor.isDefined) {
dagScheduler.executorLost(failedExecutor.get)
backend.reviveOffers()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5ad00a1ed1..e91470800c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -233,7 +233,7 @@ private[spark] class TaskSetManager(
/** Check whether a task is currently running an attempt on a given host */
private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
- !taskAttempts(taskIndex).exists(_.host == host)
+ taskAttempts(taskIndex).exists(_.host == host)
}
/**
@@ -592,7 +592,7 @@ private[spark] class TaskSetManager(
override def removeSchedulable(schedulable: Schedulable) {}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
+ var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]()
sortedTaskSetQueue += this
sortedTaskSetQueue
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index faa6e1ebe8..33aac52051 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
- val sparkHome = sc.getSparkHome().getOrElse(null)
+ val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index 42f52d7b26..3efe738a08 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage)
- resultMessage != None
+ resultMessage.isDefined
}
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 48cec4be41..530712b5df 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -138,6 +138,7 @@ private[spark] class DiskBlockObjectWriter(
fos = null
ts = null
objOut = null
+ initialized = false
}
}
@@ -145,7 +146,8 @@ private[spark] class DiskBlockObjectWriter(
override def commit(): Long = {
if (initialized) {
- // NOTE: Flush the serializer first and then the compressed/buffered output stream
+ // NOTE: Because Kryo doesn't flush the underlying stream we explicitly flush both the
+ // serializer stream and the lower level stream.
objOut.flush()
bs.flush()
val prevPos = lastValidPosition
@@ -175,7 +177,6 @@ private[spark] class DiskBlockObjectWriter(
}
override def fileSegment(): FileSegment = {
- val bytesWritten = lastValidPosition - initialPosition
new FileSegment(file, initialPosition, bytesWritten)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 27f057b9f2..eb5a185216 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -214,7 +214,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
- if (rddToAdd != None && rddToAdd == getRddId(blockId)) {
+ if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) {
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
"block from the same RDD")
return false
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index e2b24298a5..bb07c8cb13 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -23,10 +23,11 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
+import org.apache.spark.Logging
import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{MetadataCleanerType, MetadataCleaner, TimeStampedHashMap}
-import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
@@ -58,7 +59,7 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
private[spark]
-class ShuffleBlockManager(blockManager: BlockManager) {
+class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
def conf = blockManager.conf
// Turning off shuffle file consolidation causes all shuffle Blocks to get their own file.
@@ -106,6 +107,15 @@ class ShuffleBlockManager(blockManager: BlockManager) {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
val blockFile = blockManager.diskBlockManager.getFile(blockId)
+ // Because of previous failures, the shuffle file may already exist on this machine.
+ // If so, remove it.
+ if (blockFile.exists) {
+ if (blockFile.delete()) {
+ logInfo(s"Removed existing shuffle file $blockFile")
+ } else {
+ logWarning(s"Failed to remove existing shuffle file $blockFile")
+ }
+ }
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index f600c7001a..861ad62f9f 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import scala.reflect.{classTag, ClassTag}
+import scala.reflect.ClassTag
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -46,27 +46,6 @@ import org.apache.spark.{SparkConf, SparkException, Logging}
*/
private[spark] object Utils extends Logging {
- /**
- * We try to clone for most common types of writables and we call WritableUtils.clone otherwise
- * intention is to optimize, for example for NullWritable there is no need and for Long, int and
- * String creating a new object with value set would be faster.
- */
- def cloneWritables[T: ClassTag](conf: Configuration): Writable => T = {
- val cloneFunc = classTag[T] match {
- case ClassTag(_: Text) =>
- (w: Writable) => new Text(w.asInstanceOf[Text].getBytes).asInstanceOf[T]
- case ClassTag(_: LongWritable) =>
- (w: Writable) => new LongWritable(w.asInstanceOf[LongWritable].get).asInstanceOf[T]
- case ClassTag(_: IntWritable) =>
- (w: Writable) => new IntWritable(w.asInstanceOf[IntWritable].get).asInstanceOf[T]
- case ClassTag(_: NullWritable) =>
- (w: Writable) => w.asInstanceOf[T] // TODO: should we clone this ?
- case _ =>
- (w: Writable) => WritableUtils.clone(w, conf).asInstanceOf[T] // slower way of cloning.
- }
- cloneFunc
- }
-
/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
@@ -692,7 +671,7 @@ private[spark] object Utils extends Logging {
for (el <- trace) {
if (!finished) {
- if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
+ if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 64e9b436f0..3d9b09ec33 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -20,14 +20,15 @@ package org.apache.spark.util.collection
import java.io._
import java.util.Comparator
-import it.unimi.dsi.fastutil.io.FastBufferedInputStream
-
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+
import org.apache.spark.{Logging, SparkEnv}
-import org.apache.spark.serializer.{KryoDeserializationStream, KryoSerializationStream, Serializer}
-import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter}
+import org.apache.spark.io.LZFCompressionCodec
+import org.apache.spark.serializer.{KryoDeserializationStream, Serializer}
+import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockObjectWriter}
/**
* An append-only map that spills sorted content to disk when there is insufficient space for it
@@ -153,9 +154,33 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
.format(mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
- val compressStream: OutputStream => OutputStream = blockManager.wrapForCompression(blockId, _)
+ /* IMPORTANT NOTE: To avoid having to keep large object graphs in memory, this approach
+ * closes and re-opens serialization and compression streams within each file. This makes some
+ * assumptions about the way that serialization and compression streams work, specifically:
+ *
+ * 1) The serializer input streams do not pre-fetch data from the underlying stream.
+ *
+ * 2) Several compression streams can be opened, written to, and flushed on the write path
+ * while only one compression input stream is created on the read path
+ *
+ * In practice (1) is only true for Java, so we add a special fix below to make it work for
+ * Kryo. (2) is only true for LZF and not Snappy, so we coerce this to use LZF.
+ *
+ * To avoid making these assumptions we should create an intermediate stream that batches
+ * objects and sends an EOF to the higher layer streams to make sure they never prefetch data.
+ * This is a bit tricky because, within each segment, you'd need to track the total number
+ * of bytes written and then re-wind and write it at the beginning of the segment. This will
+ * most likely require using the file channel API.
+ */
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ def wrapForCompression(outputStream: OutputStream) = {
+ if (shouldCompress) compressionCodec.compressedOutputStream(outputStream) else outputStream
+ }
+
def getNewWriter = new DiskBlockObjectWriter(blockId, file, serializer, fileBufferSize,
- compressStream, syncWrites)
+ wrapForCompression, syncWrites)
var writer = getNewWriter
var objectsWritten = 0
@@ -168,6 +193,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten == serializerBatchSize) {
writer.commit()
+ writer.close()
+ _diskBytesSpilled += writer.bytesWritten
writer = getNewWriter
objectsWritten = 0
}
@@ -176,8 +203,8 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
if (objectsWritten > 0) writer.commit()
} finally {
// Partial failures cannot be tolerated; do not revert partial writes
- _diskBytesSpilled += writer.bytesWritten
writer.close()
+ _diskBytesSpilled += writer.bytesWritten
}
currentMap = new SizeTrackingAppendOnlyMap[K, C]
spilledMaps.append(new DiskMapIterator(file, blockId))
@@ -331,7 +358,15 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
private class DiskMapIterator(file: File, blockId: BlockId) extends Iterator[(K, C)] {
val fileStream = new FileInputStream(file)
val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize)
- val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream)
+
+ val shouldCompress = blockManager.shouldCompress(blockId)
+ val compressionCodec = new LZFCompressionCodec(sparkConf)
+ val compressedStream =
+ if (shouldCompress) {
+ compressionCodec.compressedInputStream(bufferedStream)
+ } else {
+ bufferedStream
+ }
var deserializeStream = ser.deserializeStream(compressedStream)
var objectsRead = 0
diff --git a/core/src/test/resources/spark.conf b/core/src/test/resources/spark.conf
deleted file mode 100644
index aa4e751235..0000000000
--- a/core/src/test/resources/spark.conf
+++ /dev/null
@@ -1,8 +0,0 @@
-# A simple spark.conf file used only in our unit tests
-
-spark.test.intTestProperty = 1
-
-spark.test {
- stringTestProperty = "hi"
- listTestProperty = ["a", "b"]
-}
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 23ec6c3b31..8c573ac0d6 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -387,18 +387,21 @@ public class JavaAPISuite implements Serializable {
return 1.0 * x;
}
}).cache();
+ doubles.collect();
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x);
}
}).cache();
+ pairs.collect();
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
@Override
public String call(Integer x) {
return x.toString();
}
}).cache();
+ strings.collect();
}
@Test
@@ -962,4 +965,18 @@ public class JavaAPISuite implements Serializable {
}
}
+
+ @Test
+ public void collectAsMapWithIntArrayValues() {
+ // Regression test for SPARK-1040
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
+ JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
+ @Override
+ public Tuple2<Integer, int[]> call(Integer x) throws Exception {
+ return new Tuple2<Integer, int[]>(x, new int[] { x });
+ }
+ });
+ pairRDD.collect(); // Works fine
+ Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index fa49974db4..87e9012622 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -20,35 +20,23 @@ package org.apache.spark
import org.scalatest.FunSuite
class SparkConfSuite extends FunSuite with LocalSparkContext {
- // This test uses the spark.conf in core/src/test/resources, which has a few test properties
- test("loading from spark.conf") {
- val conf = new SparkConf()
- assert(conf.get("spark.test.intTestProperty") === "1")
- assert(conf.get("spark.test.stringTestProperty") === "hi")
- // NOTE: we don't use list properties yet, but when we do, we'll have to deal with this syntax
- assert(conf.get("spark.test.listTestProperty") === "[a, b]")
- }
-
- // This test uses the spark.conf in core/src/test/resources, which has a few test properties
- test("system properties override spark.conf") {
+ test("loading from system properties") {
try {
- System.setProperty("spark.test.intTestProperty", "2")
+ System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
- assert(conf.get("spark.test.intTestProperty") === "2")
- assert(conf.get("spark.test.stringTestProperty") === "hi")
+ assert(conf.get("spark.test.testProperty") === "2")
} finally {
- System.clearProperty("spark.test.intTestProperty")
+ System.clearProperty("spark.test.testProperty")
}
}
test("initializing without loading defaults") {
try {
- System.setProperty("spark.test.intTestProperty", "2")
+ System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
- assert(!conf.contains("spark.test.intTestProperty"))
- assert(!conf.contains("spark.test.stringTestProperty"))
+ assert(!conf.contains("spark.test.testProperty"))
} finally {
- System.clearProperty("spark.test.intTestProperty")
+ System.clearProperty("spark.test.testProperty")
}
}
@@ -124,4 +112,25 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {
assert(sc.master === "local[2]")
assert(sc.appName === "My other app")
}
+
+ test("nested property names") {
+ // This wasn't supported by some external conf parsing libraries
+ try {
+ System.setProperty("spark.test.a", "a")
+ System.setProperty("spark.test.a.b", "a.b")
+ System.setProperty("spark.test.a.b.c", "a.b.c")
+ val conf = new SparkConf()
+ assert(conf.get("spark.test.a") === "a")
+ assert(conf.get("spark.test.a.b") === "a.b")
+ assert(conf.get("spark.test.a.b.c") === "a.b.c")
+ conf.set("spark.test.a.b", "A.B")
+ assert(conf.get("spark.test.a") === "a")
+ assert(conf.get("spark.test.a.b") === "A.B")
+ assert(conf.get("spark.test.a.b.c") === "a.b.c")
+ } finally {
+ System.clearProperty("spark.test.a")
+ System.clearProperty("spark.test.a.b")
+ System.clearProperty("spark.test.a.b.c")
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index d05bbd6ff7..693b1ab237 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -74,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
- new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
+ new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}
def createAppInfo() : ApplicationInfo = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index a79ee690d3..4baa65659f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,11 +26,11 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
- val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+ val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
- val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
+ val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
f("ooga"), "blah", ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index c9f6cc5d07..ecac2f79a2 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -287,7 +287,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// after the last failure.
(1 to manager.maxTaskFailures).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
- assert(offerResult != None,
+ assert(offerResult.isDefined,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 18aa587662..85011c6451 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -137,9 +137,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.getLocations("a1").size > 0, "master was not told about a1")
@@ -186,9 +186,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
val memStatus = master.getMemoryStatus.head._2
assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
- assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
- assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
- assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
+ assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
// Checking whether master knows about the blocks or not
assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
@@ -259,7 +259,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.removeExecutor(store.blockManagerId.executorId)
@@ -333,14 +333,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
}
@@ -352,14 +352,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.getSingle("a1") === None, "a1 was in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
assert(store.getSingle("a3") === None, "a3 was in store")
}
@@ -374,8 +374,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
- assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store")
- assert(store.getSingle(rdd(0, 1)) != None, "rdd_0_1 was not in store")
+ assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
+ assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
// Check that rdd_0_3 doesn't replace them even after further accesses
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
@@ -392,7 +392,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
// Do a get() on rdd_0_2 so that it is the most recently used item
- assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store")
+ assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
@@ -413,9 +413,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
- assert(store.getSingle("a2") != None, "a2 was in store")
- assert(store.getSingle("a3") != None, "a3 was in store")
- assert(store.getSingle("a1") != None, "a1 was in store")
+ assert(store.getSingle("a2").isDefined, "a2 was in store")
+ assert(store.getSingle("a3").isDefined, "a3 was in store")
+ assert(store.getSingle("a1").isDefined, "a1 was in store")
}
test("disk and memory storage") {
@@ -426,11 +426,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
}
test("disk and memory storage with getLocalBytes") {
@@ -441,11 +441,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
- assert(store.getLocalBytes("a2") != None, "a2 was not in store")
- assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
+ assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getLocalBytes("a1") != None, "a1 was not in store")
- assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
}
test("disk and memory storage with serialization") {
@@ -456,11 +456,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getSingle("a1") != None, "a1 was not in store")
- assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ assert(store.getSingle("a1").isDefined, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
}
test("disk and memory storage with serialization and getLocalBytes") {
@@ -471,11 +471,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
- assert(store.getLocalBytes("a2") != None, "a2 was not in store")
- assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
+ assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
- assert(store.getLocalBytes("a1") != None, "a1 was not in store")
- assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
}
test("LRU with mixed storage levels") {
@@ -489,18 +489,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
- assert(store.getSingle("a1") != None, "a2 was not in store")
- assert(store.getSingle("a2") != None, "a3 was not in store")
- assert(store.getSingle("a3") != None, "a1 was not in store")
- assert(store.getSingle("a1") != None, "a2 was not in store")
- assert(store.getSingle("a2") != None, "a3 was not in store")
- assert(store.getSingle("a3") != None, "a1 was not in store")
+ assert(store.getSingle("a1").isDefined, "a2 was not in store")
+ assert(store.getSingle("a2").isDefined, "a3 was not in store")
+ assert(store.getSingle("a3").isDefined, "a1 was not in store")
+ assert(store.getSingle("a1").isDefined, "a2 was not in store")
+ assert(store.getSingle("a2").isDefined, "a3 was not in store")
+ assert(store.getSingle("a3").isDefined, "a1 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingle("a1") == None, "a1 was in store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
- assert(store.getSingle("a3") != None, "a3 was not in store")
- assert(store.getSingle("a4") != None, "a4 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
+ assert(store.getSingle("a3").isDefined, "a3 was not in store")
+ assert(store.getSingle("a4").isDefined, "a4 was not in store")
}
test("in-memory LRU with streams") {
@@ -511,18 +511,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- assert(store.get("list2") != None, "list2 was not in store")
+ assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.size == 2)
- assert(store.get("list3") != None, "list3 was not in store")
+ assert(store.get("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.size == 2)
assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2") != None, "list2 was not in store")
+ assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.size == 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
- assert(store.get("list1") != None, "list1 was not in store")
+ assert(store.get("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.size == 2)
- assert(store.get("list2") != None, "list2 was not in store")
+ assert(store.get("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.size == 2)
assert(store.get("list3") === None, "list1 was in store")
}
@@ -538,26 +538,26 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
// At this point LRU should not kick in because list3 is only on disk
- assert(store.get("list1") != None, "list2 was not in store")
+ assert(store.get("list1").isDefined, "list2 was not in store")
assert(store.get("list1").get.size === 2)
- assert(store.get("list2") != None, "list3 was not in store")
+ assert(store.get("list2").isDefined, "list3 was not in store")
assert(store.get("list2").get.size === 2)
- assert(store.get("list3") != None, "list1 was not in store")
+ assert(store.get("list3").isDefined, "list1 was not in store")
assert(store.get("list3").get.size === 2)
- assert(store.get("list1") != None, "list2 was not in store")
+ assert(store.get("list1").isDefined, "list2 was not in store")
assert(store.get("list1").get.size === 2)
- assert(store.get("list2") != None, "list3 was not in store")
+ assert(store.get("list2").isDefined, "list3 was not in store")
assert(store.get("list2").get.size === 2)
- assert(store.get("list3") != None, "list1 was not in store")
+ assert(store.get("list3").isDefined, "list1 was not in store")
assert(store.get("list3").get.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.get("list1") === None, "list1 was in store")
- assert(store.get("list2") != None, "list3 was not in store")
+ assert(store.get("list2").isDefined, "list3 was not in store")
assert(store.get("list2").get.size === 2)
- assert(store.get("list3") != None, "list1 was not in store")
+ assert(store.get("list3").isDefined, "list1 was not in store")
assert(store.get("list3").get.size === 2)
- assert(store.get("list4") != None, "list4 was not in store")
+ assert(store.get("list4").isDefined, "list4 was not in store")
assert(store.get("list4").get.size === 2)
}
@@ -579,7 +579,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(store.getSingle("a1") === None, "a1 was in store")
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
- assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a2").isDefined, "a2 was not in store")
}
test("block compression") {
diff --git a/docs/configuration.md b/docs/configuration.md
index 57e47bf1b9..5c4714dc24 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -18,8 +18,8 @@ Spark provides three locations to configure the system:
Spark properties control most application settings and are configured separately for each application.
The preferred way to set them is by passing a [SparkConf](api/core/index.html#org.apache.spark.SparkConf)
class to your SparkContext constructor.
-Alternatively, Spark will also load them from Java system properties (for compatibility with old versions
-of Spark) and from a [`spark.conf` file](#configuration-files) on your classpath.
+Alternatively, Spark will also load them from Java system properties, for compatibility with old versions
+of Spark.
SparkConf lets you configure most of the common properties to initialize a cluster (e.g., master URL and
application name), as well as arbitrary key-value pairs through the `set()` method. For example, we could
@@ -98,7 +98,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.default.parallelism</td>
<td>8</td>
<td>
- Default number of tasks to use for distributed shuffle operations (<code>groupByKey</code>,
+ Default number of tasks to use across the cluster for distributed shuffle operations (<code>groupByKey</code>,
<code>reduceByKey</code>, etc) when not set by user.
</td>
</tr>
@@ -158,7 +158,9 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.shuffle.spill.compress</td>
<td>true</td>
<td>
- Whether to compress data spilled during shuffles.
+ Whether to compress data spilled during shuffles. If enabled, spill compression
+ always uses the `org.apache.spark.io.LZFCompressionCodec` codec,
+ regardless of the value of `spark.io.compression.codec`.
</td>
</tr>
<tr>
@@ -379,13 +381,6 @@ Apart from these, the following properties are also available, and may be useful
Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
-<tr>
- <td>akka.x.y....</td>
- <td>value</td>
- <td>
- An arbitrary akka configuration can be set directly on spark conf and it is applied for all the ActorSystems created spark wide for that SparkContext and its assigned executors as well.
- </td>
-</tr>
<tr>
<td>spark.shuffle.consolidateFiles</td>
@@ -475,30 +470,6 @@ Apart from these, the following properties are also available, and may be useful
The application web UI at `http://<driver>:4040` lists Spark properties in the "Environment" tab.
This is a useful place to check to make sure that your properties have been set correctly.
-## Configuration Files
-
-You can also configure Spark properties through a `spark.conf` file on your Java classpath.
-Because these properties are usually application-specific, we recommend putting this fine *only* on your
-application's classpath, and not in a global Spark classpath.
-
-The `spark.conf` file uses Typesafe Config's [HOCON format](https://github.com/typesafehub/config#json-superset),
-which is a superset of Java properties files and JSON. For example, the following is a simple config file:
-
-{% highlight awk %}
-# Comments are allowed
-spark.executor.memory = 512m
-spark.serializer = org.apache.spark.serializer.KryoSerializer
-{% endhighlight %}
-
-The format also allows hierarchical nesting, as follows:
-
-{% highlight awk %}
-spark.akka {
- threads = 8
- timeout = 200
-}
-{% endhighlight %}
-
# Environment Variables
Certain Spark settings can be configured through environment variables, which are read from the `conf/spark-env.sh`
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index a22a22184b..0cc5505b50 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -438,3 +438,54 @@ signals), you can use the trainImplicit method to get better results.
# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, 1, 20)
{% endhighlight %}
+
+
+# Singular Value Decomposition
+Singular Value Decomposition for Tall and Skinny matrices.
+Given an *m x n* matrix *A*, we can compute matrices *U, S, V* such that
+
+*A = U * S * V^T*
+
+There is no restriction on m, but we require n^2 doubles to
+fit in memory locally on one machine.
+Further, n should be less than m.
+
+The decomposition is computed by first computing *A^TA = V S^2 V^T*,
+computing SVD locally on that (since n x n is small),
+from which we recover S and V.
+Then we compute U via easy matrix multiplication
+as *U = A * V * S^-1*
+
+Only singular vectors associated with largest k singular values
+are recovered. If there are k
+such values, then the dimensions of the return will be:
+
+* *S* is *k x k* and diagonal, holding the singular values on diagonal.
+* *U* is *m x k* and satisfies U^T*U = eye(k).
+* *V* is *n x k* and satisfies V^TV = eye(k).
+
+All input and output is expected in sparse matrix format, 0-indexed
+as tuples of the form ((i,j),value) all in
+SparseMatrix RDDs. Below is example usage.
+
+{% highlight scala %}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.SVD
+import org.apache.spark.mllib.linalg.SparseMatrix
+import org.apache.spark.mllib.linalg.MatrixEntry
+
+// Load and parse the data file
+val data = sc.textFile("mllib/data/als/test.data").map { line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
+}
+val m = 4
+val n = 4
+val k = 1
+
+// recover largest singular vector
+val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+val = decomposed.S.data
+
+println("singular values = " + s.toArray.mkString)
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 3bd62646ba..5dadd54492 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -133,7 +133,7 @@ See [Building Spark with Maven](building-with-maven.html) for instructions on ho
# Important Notes
-- We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed.
+- Before Hadoop 2.2, YARN does not support cores in container resource requests. Thus, when running against an earlier version, the numbers of cores given via command line arguments cannot be passed to YARN. Whether core requests are honored in scheduling decisions depends on which scheduler is in use and how it is configured.
- The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored.
- The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN.
- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files.
diff --git a/docs/scala-programming-guide.md b/docs/scala-programming-guide.md
index c1ef46a1cd..7c0f67bc99 100644
--- a/docs/scala-programming-guide.md
+++ b/docs/scala-programming-guide.md
@@ -168,9 +168,9 @@ The following tables list the transformations and actions currently supported (s
Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
</tr>
<tr>
- <td> <b>mapPartitionsWithSplit</b>(<i>func</i>) </td>
+ <td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
<td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
- the split, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
+ the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
</td>
</tr>
<tr>
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 3ec4a58d48..ac8df02c46 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -30,11 +30,11 @@ public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
- System.err.println("Usage: JavaLogQuery <master> [slices]");
+ System.err.println("Usage: JavaSparkPi <master> [slices]");
System.exit(1);
}
- JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
+ JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaSparkPi",
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 7b5a243e26..f061001dd2 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -70,5 +70,6 @@ public final class JavaFlumeEventCount {
}).print();
ssc.start();
+ ssc.awaitTermination();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 04f62ee204..2ffd351b4e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -104,5 +104,6 @@ public final class JavaKafkaWordCount {
wordCounts.print();
jssc.start();
+ jssc.awaitTermination();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index 349d826ab5..7777c9832a 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -84,5 +84,6 @@ public final class JavaNetworkWordCount {
wordCounts.print();
ssc.start();
+ ssc.awaitTermination();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index 7ef9c6c8f4..26c44620ab 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -58,10 +58,9 @@ public final class JavaQueueStream {
}
for (int i = 0; i < 30; i++) {
- rddQueue.add(ssc.sc().parallelize(list));
+ rddQueue.add(ssc.sparkContext().parallelize(list));
}
-
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
@@ -81,5 +80,6 @@ public final class JavaQueueStream {
reducedStream.print();
ssc.start();
+ ssc.awaitTermination();
}
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
new file mode 100644
index 0000000000..19676fcc1a
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/SparkSVD.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.SVD
+import org.apache.spark.mllib.linalg.MatrixEntry
+import org.apache.spark.mllib.linalg.SparseMatrix
+
+/**
+ * Compute SVD of an example matrix
+ * Input file should be comma separated, 1 indexed of the form
+ * i,j,value
+ * Where i is the column, j the row, and value is the matrix entry
+ *
+ * For example input file, see:
+ * mllib/data/als/test.data (example is 4 x 4)
+ */
+object SparkSVD {
+ def main(args: Array[String]) {
+ if (args.length != 4) {
+ System.err.println("Usage: SparkSVD <master> <file> m n")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SVD",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+ // Load and parse the data file
+ val data = sc.textFile(args(1)).map { line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt - 1, parts(1).toInt - 1, parts(2).toDouble)
+ }
+ val m = args(2).toInt
+ val n = args(3).toInt
+
+ // recover largest singular vector
+ val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
+ val u = decomposed.U.data
+ val s = decomposed.S.data
+ val v = decomposed.V.data
+
+ println("singular values = " + s.toArray.mkString)
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
index 57e1b1f806..a5888811cc 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala
@@ -88,7 +88,7 @@ extends Actor with Receiver {
override def preStart = remotePublisher ! SubscribeReceiver(context.self)
def receive = {
- case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+ case msg ⇒ pushBlock(msg.asInstanceOf[T])
}
override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
@@ -171,5 +171,6 @@ object ActorWordCount {
lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index a59be7899d..11c3aaad3c 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -60,5 +60,6 @@ object FlumeEventCount {
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
index 704b315ef8..954bcc9b6e 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala
@@ -50,6 +50,7 @@ object HdfsWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 4a3d81c09a..d9cb7326bb 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -61,6 +61,7 @@ object KafkaWordCount {
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 78b49fdcf1..eb61caf8c8 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -101,5 +101,6 @@ object MQTTWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index 0226475712..5656d487a5 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -54,5 +54,6 @@ object NetworkWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
index 99b79c3949..cdd7547d0d 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RawNetworkGrep.scala
@@ -61,5 +61,6 @@ object RawNetworkGrep {
union.filter(_.contains("the")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index 8c5d0bd568..aa82bf3c6b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -114,5 +114,6 @@ object RecoverableNetworkWordCount {
createContext(master, ip, port, outputPath)
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
index 1183eba846..88f1cef89b 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala
@@ -65,5 +65,6 @@ object StatefulNetworkWordCount {
val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
stateDstream.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 483c4d3118..bbd44948b6 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -110,5 +110,6 @@ object TwitterAlgebirdCMS {
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 94c2bf29ac..a0094d460f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -87,5 +87,6 @@ object TwitterAlgebirdHLL {
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 8a70d4a978..896d010c68 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -69,5 +69,6 @@ object TwitterPopularTags {
})
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index 12d2a1084f..85b4ce5e81 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -91,5 +91,6 @@ object ZeroMQWordCount {
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
+ ssc.awaitTermination()
}
}
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 4eca4747ea..baa240aff2 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -38,10 +38,14 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.jblas</groupId>
+ <artifactId>jblas</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
-
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index c327ce7935..ccd7de537b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -18,7 +18,7 @@
package org.apache.spark.graphx.lib
import scala.util.Random
-import org.apache.commons.math.linear._
+import org.jblas.DoubleMatrix
import org.apache.spark.rdd._
import org.apache.spark.graphx._
@@ -52,15 +52,15 @@ object SVDPlusPlus {
* @return a graph with vertex attributes containing the trained model
*/
def run(edges: RDD[Edge[Double]], conf: Conf)
- : (Graph[(RealVector, RealVector, Double, Double), Double], Double) =
+ : (Graph[(DoubleMatrix, DoubleMatrix, Double, Double), Double], Double) =
{
// Generate default vertex attribute
- def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = {
- val v1 = new ArrayRealVector(rank)
- val v2 = new ArrayRealVector(rank)
+ def defaultF(rank: Int): (DoubleMatrix, DoubleMatrix, Double, Double) = {
+ val v1 = new DoubleMatrix(rank)
+ val v2 = new DoubleMatrix(rank)
for (i <- 0 until rank) {
- v1.setEntry(i, Random.nextDouble())
- v2.setEntry(i, Random.nextDouble())
+ v1.put(i, Random.nextDouble())
+ v2.put(i, Random.nextDouble())
}
(v1, v2, 0.0, 0.0)
}
@@ -76,31 +76,32 @@ object SVDPlusPlus {
// Calculate initial bias and norm
val t0 = g.mapReduceTriplets(
et => Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
- (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
+ (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
g = g.outerJoinVertices(t0) {
- (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
+ msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
def mapTrainF(conf: Conf, u: Double)
- (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
- : Iterator[(VertexId, (RealVector, RealVector, Double))] = {
+ (et: EdgeTriplet[(DoubleMatrix, DoubleMatrix, Double, Double), Double])
+ : Iterator[(VertexId, (DoubleMatrix, DoubleMatrix, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
- var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
+ var pred = u + usr._3 + itm._3 + q.dot(usr._2)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = et.attr - pred
- val updateP = q.mapMultiply(err)
- .subtract(p.mapMultiply(conf.gamma7))
- .mapMultiply(conf.gamma2)
- val updateQ = usr._2.mapMultiply(err)
- .subtract(q.mapMultiply(conf.gamma7))
- .mapMultiply(conf.gamma2)
- val updateY = q.mapMultiply(err * usr._4)
- .subtract(itm._2.mapMultiply(conf.gamma7))
- .mapMultiply(conf.gamma2)
+ val updateP = q.mul(err)
+ .subColumnVector(p.mul(conf.gamma7))
+ .mul(conf.gamma2)
+ val updateQ = usr._2.mul(err)
+ .subColumnVector(q.mul(conf.gamma7))
+ .mul(conf.gamma2)
+ val updateY = q.mul(err * usr._4)
+ .subColumnVector(itm._2.mul(conf.gamma7))
+ .mul(conf.gamma2)
Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)),
(et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1)))
}
@@ -110,34 +111,37 @@ object SVDPlusPlus {
g.cache()
val t1 = g.mapReduceTriplets(
et => Iterator((et.srcId, et.dstAttr._2)),
- (g1: RealVector, g2: RealVector) => g1.add(g2))
+ (g1: DoubleMatrix, g2: DoubleMatrix) => g1.addColumnVector(g2))
g = g.outerJoinVertices(t1) {
- (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
- if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
+ (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
+ msg: Option[DoubleMatrix]) =>
+ if (msg.isDefined) (vd._1, vd._1
+ .addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd
}
// Phase 2, update p for user nodes and q, y for item nodes
g.cache()
val t2 = g.mapReduceTriplets(
mapTrainF(conf, u),
- (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
- (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
+ (g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
+ (g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
g = g.outerJoinVertices(t2) {
(vid: VertexId,
- vd: (RealVector, RealVector, Double, Double),
- msg: Option[(RealVector, RealVector, Double)]) =>
- (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
+ vd: (DoubleMatrix, DoubleMatrix, Double, Double),
+ msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
+ (vd._1.addColumnVector(msg.get._1), vd._2.addColumnVector(msg.get._2),
+ vd._3 + msg.get._3, vd._4)
}
}
// calculate error on training set
def mapTestF(conf: Conf, u: Double)
- (et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+ (et: EdgeTriplet[(DoubleMatrix, DoubleMatrix, Double, Double), Double])
: Iterator[(VertexId, Double)] =
{
val (usr, itm) = (et.srcAttr, et.dstAttr)
val (p, q) = (usr._1, itm._1)
- var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
+ var pred = u + usr._3 + itm._3 + q.dot(usr._2)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = (et.attr - pred) * (et.attr - pred)
@@ -146,7 +150,7 @@ object SVDPlusPlus {
g.cache()
val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
g = g.outerJoinVertices(t3) {
- (vid: VertexId, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
new file mode 100644
index 0000000000..416996fcbe
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixEntry.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/**
+ * Class that represents an entry in a sparse matrix of doubles.
+ *
+ * @param i row index (0 indexing used)
+ * @param j column index (0 indexing used)
+ * @param mval value of entry in matrix
+ */
+case class MatrixEntry(val i: Int, val j: Int, val mval: Double)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
new file mode 100644
index 0000000000..319f82b449
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/MatrixSVD.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+/**
+ * Class that represents the SV decomposition of a matrix
+ *
+ * @param U such that A = USV^T
+ * @param S such that A = USV^T
+ * @param V such that A = USV^T
+ */
+case class MatrixSVD(val U: SparseMatrix,
+ val S: SparseMatrix,
+ val V: SparseMatrix)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
new file mode 100644
index 0000000000..e476b53450
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+
+/**
+ * Class used to obtain singular value decompositions
+ */
+class SVD {
+ private var k: Int = 1
+
+ /**
+ * Set the number of top-k singular vectors to return
+ */
+ def setK(k: Int): SVD = {
+ this.k = k
+ this
+ }
+
+ /**
+ * Compute SVD using the current set parameters
+ */
+ def compute(matrix: SparseMatrix) : MatrixSVD = {
+ SVD.sparseSVD(matrix, k)
+ }
+}
+
+
+/**
+ * Top-level methods for calling Singular Value Decomposition
+ * NOTE: All matrices are in 0-indexed sparse format RDD[((int, int), value)]
+ */
+object SVD {
+/**
+ * Singular Value Decomposition for Tall and Skinny matrices.
+ * Given an m x n matrix A, this will compute matrices U, S, V such that
+ * A = U * S * V'
+ *
+ * There is no restriction on m, but we require n^2 doubles to fit in memory.
+ * Further, n should be less than m.
+ *
+ * The decomposition is computed by first computing A'A = V S^2 V',
+ * computing svd locally on that (since n x n is small),
+ * from which we recover S and V.
+ * Then we compute U via easy matrix multiplication
+ * as U = A * V * S^-1
+ *
+ * Only the k largest singular values and associated vectors are found.
+ * If there are k such values, then the dimensions of the return will be:
+ *
+ * S is k x k and diagonal, holding the singular values on diagonal
+ * U is m x k and satisfies U'U = eye(k)
+ * V is n x k and satisfies V'V = eye(k)
+ *
+ * All input and output is expected in sparse matrix format, 0-indexed
+ * as tuples of the form ((i,j),value) all in RDDs using the
+ * SparseMatrix class
+ *
+ * @param matrix sparse matrix to factorize
+ * @param k Recover k singular values and vectors
+ * @return Three sparse matrices: U, S, V such that A = USV^T
+ */
+ def sparseSVD(
+ matrix: SparseMatrix,
+ k: Int)
+ : MatrixSVD =
+ {
+ val data = matrix.data
+ val m = matrix.m
+ val n = matrix.n
+
+ if (m < n || m <= 0 || n <= 0) {
+ throw new IllegalArgumentException("Expecting a tall and skinny matrix")
+ }
+
+ if (k < 1 || k > n) {
+ throw new IllegalArgumentException("Must request up to n singular values")
+ }
+
+ // Compute A^T A, assuming rows are sparse enough to fit in memory
+ val rows = data.map(entry =>
+ (entry.i, (entry.j, entry.mval))).groupByKey()
+ val emits = rows.flatMap{ case (rowind, cols) =>
+ cols.flatMap{ case (colind1, mval1) =>
+ cols.map{ case (colind2, mval2) =>
+ ((colind1, colind2), mval1*mval2) } }
+ }.reduceByKey(_+_)
+
+ // Construct jblas A^T A locally
+ val ata = DoubleMatrix.zeros(n, n)
+ for (entry <- emits.toArray) {
+ ata.put(entry._1._1, entry._1._2, entry._2)
+ }
+
+ // Since A^T A is small, we can compute its SVD directly
+ val svd = Singular.sparseSVD(ata)
+ val V = svd(0)
+ val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)
+
+ if (sigmas.size < k) {
+ throw new Exception("Not enough singular values to return")
+ }
+
+ val sigma = sigmas.take(k)
+
+ val sc = data.sparkContext
+
+ // prepare V for returning
+ val retVdata = sc.makeRDD(
+ Array.tabulate(V.rows, sigma.length){ (i,j) =>
+ MatrixEntry(i, j, V.get(i,j)) }.flatten)
+ val retV = SparseMatrix(retVdata, V.rows, sigma.length)
+
+ val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
+ x => MatrixEntry(x, x, sigma(x))})
+
+ val retS = SparseMatrix(retSdata, sigma.length, sigma.length)
+
+ // Compute U as U = A V S^-1
+ // turn V S^-1 into an RDD as a sparse matrix
+ val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
+ { (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten)
+
+ // Multiply A by VS^-1
+ val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
+ val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
+ val retUdata = aCols.join(bRows).map( {case (key, ( (rowInd, rowVal), (colInd, colVal)) )
+ => ((rowInd, colInd), rowVal*colVal)}).reduceByKey(_+_)
+ .map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}
+ val retU = SparseMatrix(retUdata, m, sigma.length)
+
+ MatrixSVD(retU, retS, retV)
+ }
+
+
+ def main(args: Array[String]) {
+ if (args.length < 8) {
+ println("Usage: SVD <master> <matrix_file> <m> <n> " +
+ "<k> <output_U_file> <output_S_file> <output_V_file>")
+ System.exit(1)
+ }
+
+ val (master, inputFile, m, n, k, output_u, output_s, output_v) =
+ (args(0), args(1), args(2).toInt, args(3).toInt,
+ args(4).toInt, args(5), args(6), args(7))
+
+ val sc = new SparkContext(master, "SVD")
+
+ val rawdata = sc.textFile(inputFile)
+ val data = rawdata.map { line =>
+ val parts = line.split(',')
+ MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
+ }
+
+ val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
+ val u = decomposed.U.data
+ val s = decomposed.S.data
+ val v = decomposed.V.data
+
+ println("Computed " + s.toArray.length + " singular values and vectors")
+ u.saveAsTextFile(output_u)
+ s.saveAsTextFile(output_s)
+ v.saveAsTextFile(output_v)
+ System.exit(0)
+ }
+}
+
+
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
new file mode 100644
index 0000000000..cbd1a2a5a4
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SparseMatrix.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Class that represents a sparse matrix
+ *
+ * @param data RDD of nonzero entries
+ * @param m number of rows
+ * @param n numner of columns
+ */
+case class SparseMatrix(val data: RDD[MatrixEntry], val m: Int, val n: Int)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
index 4c51f4f881..37124f261e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
@@ -86,13 +86,17 @@ class L1Updater extends Updater {
/**
* Updater that adjusts the learning rate and performs L2 regularization
+ *
+ * See, for example, explanation of gradient and loss with L2 regularization on slide 21-22
+ * of <a href="http://people.cs.umass.edu/~sheldon/teaching/2012fa/ml/files/lec7-annotated.pdf">
+ * these slides</a>.
*/
class SquaredL2Updater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
val thisIterStepSize = stepSize / math.sqrt(iter)
val normGradient = gradient.mul(thisIterStepSize)
- val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0)
+ val newWeights = weightsOld.mul(1.0 - 2.0 * thisIterStepSize * regParam).sub(normGradient)
(newWeights, pow(newWeights.norm2, 2.0) * regParam)
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 89ee07063d..c5f64b1350 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.recommendation
import scala.collection.mutable.{ArrayBuffer, BitSet}
+import scala.math.{abs, sqrt}
import scala.util.Random
import scala.util.Sorting
@@ -301,7 +302,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
* Make a random factor vector with the given random.
*/
private def randomFactor(rank: Int, rand: Random): Array[Double] = {
- Array.fill(rank)(rand.nextDouble)
+ // Choose a unit vector uniformly at random from the unit sphere, but from the
+ // "first quadrant" where all elements are nonnegative. This can be done by choosing
+ // elements distributed as Normal(0,1) and taking the absolute value, and then normalizing.
+ // This appears to create factorizations that have a slightly better reconstruction
+ // (<1%) compared picking elements uniformly at random in [0,1].
+ val factor = Array.fill(rank)(abs(rand.nextGaussian()))
+ val norm = sqrt(factor.map(x => x * x).sum)
+ factor.map(x => x / norm)
}
/**
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 02ede71137..05322b024d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.LocalSparkContext
object LogisticRegressionSuite {
@@ -66,19 +67,7 @@ object LogisticRegressionSuite {
}
-class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
-
+class LogisticRegressionSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
prediction != expected.label
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index b615f76e66..9dd6c79ee6 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -23,7 +23,7 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.util.LocalSparkContext
object NaiveBayesSuite {
@@ -59,17 +59,7 @@ object NaiveBayesSuite {
}
}
-class NaiveBayesSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class NaiveBayesSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOfPredictions = predictions.zip(input).count {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index 3357b86f9b..bc7abb568a 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -25,8 +25,9 @@ import org.scalatest.FunSuite
import org.jblas.DoubleMatrix
-import org.apache.spark.{SparkException, SparkContext}
+import org.apache.spark.SparkException
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.LocalSparkContext
object SVMSuite {
@@ -58,17 +59,7 @@ object SVMSuite {
}
-class SVMSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class SVMSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
index 73657cac89..4ef1d1f64f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -21,20 +21,9 @@ package org.apache.spark.mllib.clustering
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.util.LocalSparkContext
-
-class KMeansSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class KMeansSuite extends FunSuite with LocalSparkContext {
val EPSILON = 1e-4
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
new file mode 100644
index 0000000000..32f3f141cd
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import scala.util.Random
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.FunSuite
+
+import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd.RDD
+
+import org.jblas._
+
+class SVDSuite extends FunSuite with BeforeAndAfterAll {
+ @transient private var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ }
+
+ override def afterAll() {
+ sc.stop()
+ System.clearProperty("spark.driver.port")
+ }
+
+ val EPSILON = 1e-4
+
+ // Return jblas matrix from sparse matrix RDD
+ def getDenseMatrix(matrix: SparseMatrix) : DoubleMatrix = {
+ val data = matrix.data
+ val m = matrix.m
+ val n = matrix.n
+ val ret = DoubleMatrix.zeros(m, n)
+ matrix.data.toArray.map(x => ret.put(x.i, x.j, x.mval))
+ ret
+ }
+
+ def assertMatrixEquals(a: DoubleMatrix, b: DoubleMatrix) {
+ assert(a.rows == b.rows && a.columns == b.columns, "dimension mismatch")
+ val diff = DoubleMatrix.zeros(a.rows, a.columns)
+ Array.tabulate(a.rows, a.columns){(i, j) =>
+ diff.put(i, j,
+ Math.min(Math.abs(a.get(i, j) - b.get(i, j)),
+ Math.abs(a.get(i, j) + b.get(i, j)))) }
+ assert(diff.norm1 < EPSILON, "matrix mismatch: " + diff.norm1)
+ }
+
+ test("full rank matrix svd") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, (a + 2).toDouble * (b + 1) / (1 + a + b)) }.flatten )
+
+ val a = SparseMatrix(data, m, n)
+
+ val decomposed = SVD.sparseSVD(a, n)
+ val u = decomposed.U
+ val v = decomposed.V
+ val s = decomposed.S
+
+ val densea = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
+ assertMatrixEquals(retv, svd(2))
+
+ // check multiplication guarantee
+ assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ }
+
+ test("rank one matrix svd") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m, n){ (a,b) =>
+ MatrixEntry(a, b, 1.0) }.flatten )
+ val k = 1
+
+ val a = SparseMatrix(data, m, n)
+
+ val decomposed = SVD.sparseSVD(a, k)
+ val u = decomposed.U
+ val s = decomposed.S
+ val v = decomposed.V
+ val retrank = s.data.toArray.length
+
+ assert(retrank == 1, "rank returned not one")
+
+ val densea = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0).getColumn(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixEquals(retv, svd(2).getColumn(0))
+
+ // check multiplication guarantee
+ assertMatrixEquals(retu.mmul(rets).mmul(retv.transpose), densea)
+ }
+
+ test("truncated with k") {
+ val m = 10
+ val n = 3
+ val data = sc.makeRDD(Array.tabulate(m,n){ (a, b) =>
+ MatrixEntry(a, b, (a + 2).toDouble * (b + 1)/(1 + a + b)) }.flatten )
+ val a = SparseMatrix(data, m, n)
+
+ val k = 1 // only one svalue above this
+
+ val decomposed = SVD.sparseSVD(a, k)
+ val u = decomposed.U
+ val s = decomposed.S
+ val v = decomposed.V
+ val retrank = s.data.toArray.length
+
+ val densea = getDenseMatrix(a)
+ val svd = Singular.sparseSVD(densea)
+
+ val retu = getDenseMatrix(u)
+ val rets = getDenseMatrix(s)
+ val retv = getDenseMatrix(v)
+
+ assert(retrank == 1, "rank returned not one")
+
+ // check individual decomposition
+ assertMatrixEquals(retu, svd(0).getColumn(0))
+ assertMatrixEquals(rets, DoubleMatrix.diag(svd(1).getRow(0)))
+ assertMatrixEquals(retv, svd(2).getColumn(0))
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index a6028a1e98..a453de6767 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.matchers.ShouldMatchers
import org.apache.spark.SparkContext
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.util.LocalSparkContext
object GradientDescentSuite {
@@ -62,17 +63,7 @@ object GradientDescentSuite {
}
}
-class GradientDescentSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
test("Assert the loss is decreasing.") {
val nPoints = 10000
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 4e8dbde658..5dcec7dc3e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -23,10 +23,10 @@ import scala.util.Random
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-
import org.jblas._
+import org.apache.spark.mllib.util.LocalSparkContext
+
object ALSSuite {
def generateRatingsAsJavaList(
@@ -73,17 +73,7 @@ object ALSSuite {
}
-class ALSSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class ALSSuite extends FunSuite with LocalSparkContext {
test("rank-1 matrices") {
testALS(50, 100, 1, 15, 0.7, 0.3)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index b2c8df97a8..64e4cbb860 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -22,21 +22,9 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.apache.spark.SparkContext
-import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
-
-class LassoSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class LassoSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 406afbaa3e..281f9df36d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -20,20 +20,9 @@ package org.apache.spark.mllib.regression
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
-class LinearRegressionSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
-
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class LinearRegressionSuite extends FunSuite with LocalSparkContext {
def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index 1d6a10b66e..67dd06cc0f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -22,20 +22,10 @@ import org.jblas.DoubleMatrix
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext
-import org.apache.spark.mllib.util.LinearDataGenerator
+import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
-class RidgeRegressionSuite extends FunSuite with BeforeAndAfterAll {
- @transient private var sc: SparkContext = _
- override def beforeAll() {
- sc = new SparkContext("local", "test")
- }
-
- override def afterAll() {
- sc.stop()
- System.clearProperty("spark.driver.port")
- }
+class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
def predictionError(predictions: Seq[Double], input: Seq[LabeledPoint]) = {
predictions.zip(input).map { case (prediction, expected) =>
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
new file mode 100644
index 0000000000..7d840043e5
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalSparkContext.scala
@@ -0,0 +1,23 @@
+package org.apache.spark.mllib.util
+
+import org.scalatest.Suite
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.SparkContext
+
+trait LocalSparkContext extends BeforeAndAfterAll { self: Suite =>
+ @transient var sc: SparkContext = _
+
+ override def beforeAll() {
+ sc = new SparkContext("local", "test")
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ if (sc != null) {
+ sc.stop()
+ }
+ System.clearProperty("spark.driver.port")
+ super.afterAll()
+ }
+}
diff --git a/pom.xml b/pom.xml
index 54072b053c..1ac8f0fa07 100644
--- a/pom.xml
+++ b/pom.xml
@@ -389,7 +389,7 @@
<dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
- <version>0.9</version>
+ <version>0.10</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a9f9937cb1..e33f230188 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -212,12 +212,13 @@ object SparkBuild extends Build {
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
- "com.novocode" % "junit-interface" % "0.9" % "test",
+ "com.novocode" % "junit-interface" % "0.10" % "test",
"org.easymock" % "easymock" % "3.1" % "test",
"org.mockito" % "mockito-all" % "1.8.5" % "test",
"commons-io" % "commons-io" % "2.4" % "test"
),
+ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
parallelExecution := true,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
@@ -277,7 +278,6 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1",
- "com.typesafe" % "config" % "1.0.2",
"com.clearspring.analytics" % "stream" % "2.5.1"
)
)
@@ -317,7 +317,10 @@ object SparkBuild extends Build {
) ++ assemblySettings ++ extraAssemblySettings
def graphxSettings = sharedSettings ++ Seq(
- name := "spark-graphx"
+ name := "spark-graphx",
+ libraryDependencies ++= Seq(
+ "org.jblas" % "jblas" % "1.2.3"
+ )
)
def bagelSettings = sharedSettings ++ Seq(
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index 6a66bd1d06..4853be2617 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -20,5 +20,5 @@ import sbt._
object SparkPluginDef extends Build {
lazy val root = Project("plugins", file(".")) dependsOn(junitXmlListener)
/* This is not published in a Maven repository, so we get it from GitHub directly */
- lazy val junitXmlListener = uri("git://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce")
+ lazy val junitXmlListener = uri("https://github.com/ijuma/junit_xml_listener.git#fe434773255b451a38e8d889536ebc260f4225ce")
}
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index d72aed6a30..3870cd8f2b 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -61,14 +61,12 @@ class SparkConf(object):
Most of the time, you would create a SparkConf object with
C{SparkConf()}, which will load values from C{spark.*} Java system
- properties and any C{spark.conf} on your Spark classpath. In this
- case, system properties take priority over C{spark.conf}, and any
- parameters you set directly on the C{SparkConf} object take priority
- over both of those.
+ properties as well. In this case, any parameters you set directly on
+ the C{SparkConf} object take priority over system properties.
For unit tests, you can also call C{SparkConf(false)} to skip
loading external settings and get the same configuration no matter
- what is on the classpath.
+ what the system properties are.
All setter methods in this class support chaining. For example,
you can write C{conf.setMaster("local").setAppName("My app")}.
@@ -82,7 +80,7 @@ class SparkConf(object):
Create a new Spark configuration.
@param loadDefaults: whether to load values from Java system
- properties and classpath (True by default)
+ properties (True by default)
@param _jvm: internal parameter used to pass a handle to the
Java VM; does not need to be set by users
"""
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
index b1a5df109b..b420d7a7f2 100644
--- a/python/pyspark/mllib/__init__.py
+++ b/python/pyspark/mllib/__init__.py
@@ -18,3 +18,13 @@
"""
Python bindings for MLlib.
"""
+
+# MLlib currently needs Python 2.7+ and NumPy 1.7+, so complain if lower
+
+import sys
+if sys.version_info[0:2] < (2, 7):
+ raise Exception("MLlib requires Python 2.7+")
+
+import numpy
+if numpy.version.version < '1.7':
+ raise Exception("MLlib requires NumPy 1.7+")
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 6fb4a7b3be..1ad4b52987 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -27,6 +27,7 @@ import traceback
from subprocess import Popen, PIPE
from tempfile import NamedTemporaryFile
from threading import Thread
+import warnings
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, pack_long
@@ -179,7 +180,7 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
- return self.mapPartitionsWithSplit(func, preservesPartitioning)
+ return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
"""
@@ -191,10 +192,24 @@ class RDD(object):
[3, 7]
"""
def func(s, iterator): return f(iterator)
- return self.mapPartitionsWithSplit(func)
+ return self.mapPartitionsWithIndex(func)
+
+ def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
+ """
+ Return a new RDD by applying a function to each partition of this RDD,
+ while tracking the index of the original partition.
+
+ >>> rdd = sc.parallelize([1, 2, 3, 4], 4)
+ >>> def f(splitIndex, iterator): yield splitIndex
+ >>> rdd.mapPartitionsWithIndex(f).sum()
+ 6
+ """
+ return PipelinedRDD(self, f, preservesPartitioning)
def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
"""
+ Deprecated: use mapPartitionsWithIndex instead.
+
Return a new RDD by applying a function to each partition of this RDD,
while tracking the index of the original partition.
@@ -203,7 +218,9 @@ class RDD(object):
>>> rdd.mapPartitionsWithSplit(f).sum()
6
"""
- return PipelinedRDD(self, f, preservesPartitioning)
+ warnings.warn("mapPartitionsWithSplit is deprecated; "
+ "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
+ return self.mapPartitionsWithIndex(f, preservesPartitioning)
def filter(self, f):
"""
@@ -235,7 +252,7 @@ class RDD(object):
>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
"""
- return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
+ return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
# this is ported from scala/spark/RDD.scala
def takeSample(self, withReplacement, num, seed):
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 7acb6eaf10..527104587f 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -152,6 +152,33 @@ class TestRDDFunctions(PySparkTestCase):
raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))
+ def test_transforming_cartesian_result(self):
+ # Regression test for SPARK-1034
+ rdd1 = self.sc.parallelize([1, 2])
+ rdd2 = self.sc.parallelize([3, 4])
+ cart = rdd1.cartesian(rdd2)
+ result = cart.map(lambda (x, y): x + y).collect()
+
+ def test_cartesian_on_textfile(self):
+ # Regression test for
+ path = os.path.join(SPARK_HOME, "python/test_support/hello.txt")
+ a = self.sc.textFile(path)
+ result = a.cartesian(a).collect()
+ (x, y) = result[0]
+ self.assertEqual("Hello World!", x.strip())
+ self.assertEqual("Hello World!", y.strip())
+
+ def test_deleting_input_files(self):
+ # Regression test for SPARK-1025
+ tempFile = NamedTemporaryFile(delete=False)
+ tempFile.write("Hello World!")
+ tempFile.close()
+ data = self.sc.textFile(tempFile.name)
+ filtered_data = data.filter(lambda x: True)
+ self.assertEqual(1, filtered_data.count())
+ os.unlink(tempFile.name)
+ self.assertRaises(Exception, lambda: filtered_data.count())
+
class TestIO(PySparkTestCase):
diff --git a/python/run-tests b/python/run-tests
index 2005f610b4..a986ac9380 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -40,11 +40,11 @@ run_test "-m doctest pyspark/broadcast.py"
run_test "-m doctest pyspark/accumulators.py"
run_test "-m doctest pyspark/serializers.py"
run_test "pyspark/tests.py"
-#run_test "pyspark/mllib/_common.py"
-#run_test "pyspark/mllib/classification.py"
-#run_test "pyspark/mllib/clustering.py"
-#run_test "pyspark/mllib/recommendation.py"
-#run_test "pyspark/mllib/regression.py"
+run_test "pyspark/mllib/_common.py"
+run_test "pyspark/mllib/classification.py"
+run_test "pyspark/mllib/clustering.py"
+run_test "pyspark/mllib/recommendation.py"
+run_test "pyspark/mllib/regression.py"
if [[ $FAILED != 0 ]]; then
echo -en "\033[31m" # Red
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 26257e652e..5847b95e3f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -42,9 +42,15 @@ import org.apache.spark.streaming.scheduler._
import org.apache.hadoop.conf.Configuration
/**
- * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
- * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
- * methods used to create DStream from various input sources.
+ * Main entry point for Spark Streaming functionality. It provides methods used to create
+ * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
+ * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
+ * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
+ * The associated SparkContext can be accessed using `context.sparkContext`. After
+ * creating and transforming DStreams, the streaming computation can be started and stopped
+ * using `context.start()` and `context.stop()`, respectively.
+ * `context.awaitTransformation()` allows the current thread to wait for the termination
+ * of the context by `stop()` or by an exception.
*/
class StreamingContext private[streaming] (
sc_ : SparkContext,
@@ -63,7 +69,7 @@ class StreamingContext private[streaming] (
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
- * @param conf a [[org.apache.spark.SparkConf]] object specifying Spark parameters
+ * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
@@ -88,7 +94,7 @@ class StreamingContext private[streaming] (
}
/**
- * Re-create a StreamingContext from a checkpoint file.
+ * Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
* @param hadoopConf Optional, configuration object if necessary for reading from
* HDFS compatible filesystems
@@ -151,6 +157,7 @@ class StreamingContext private[streaming] (
private[streaming] val scheduler = new JobScheduler(this)
private[streaming] val waiter = new ContextWaiter
+
/**
* Return the associated Spark context
*/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
index c92854ccd9..e23b725052 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStream.scala
@@ -27,22 +27,12 @@ import scala.reflect.ClassTag
import org.apache.spark.streaming.dstream.DStream
/**
- * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
- * sequence of RDDs (of the same type) representing a continuous stream of data (see [[org.apache.spark.rdd.RDD]]
- * for more details on RDDs). DStreams can either be created from live data (such as, data from
- * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
- * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
- * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
- * by a parent DStream.
- *
- * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
- * `window`. In addition, [[org.apache.spark.streaming.api.java.JavaPairDStream]] contains operations available
- * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`.
- *
- * DStreams internally is characterized by a few basic properties:
- * - A list of other DStreams that the DStream depends on
- * - A time interval at which the DStream generates an RDD
- * - A function that is used to generate an RDD after each time interval
+ * A Java-friendly interface to [[org.apache.spark.streaming.dstream.DStream]], the basic
+ * abstraction in Spark Streaming that represents a continuous stream of data.
+ * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
+ * etc.) or it can be generated by transforming existing DStreams using operations such as `map`,
+ * `window`. For operations applicable to key-value pair DStreams, see
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream]].
*/
class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index a493a8279f..64fe204cdf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -138,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@@ -159,7 +159,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
- def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
+ def cm = implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 6bb985ca54..62cfa0a229 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -37,6 +37,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.streaming.dstream.DStream
+/**
+ * A Java-friendly interface to a DStream of key-value pairs, which provides extra methods
+ * like `reduceByKey` and `join`.
+ */
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifest: ClassTag[K],
implicit val vManifest: ClassTag[V])
@@ -741,7 +745,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
override val classTag: ClassTag[(K, V)] =
- implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
+ implicitly[ClassTag[Tuple2[_, _]]].asInstanceOf[ClassTag[Tuple2[K, V]]]
}
object JavaPairDStream {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 613683ca40..921b56143a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -22,7 +22,6 @@ import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.io.InputStream
-import java.lang.{Integer => JInt}
import java.util.{List => JList, Map => JMap}
import akka.actor.{Props, SupervisorStrategy}
@@ -39,19 +38,20 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.dstream.DStream
/**
- * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
- * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
- * methods used to create DStream from various input sources.
+ * A Java-friendly version of [[org.apache.spark.streaming.StreamingContext]] which is the main
+ * entry point for Spark Streaming functionality. It provides methods to create
+ * [[org.apache.spark.streaming.api.java.JavaDStream]] and
+ * [[org.apache.spark.streaming.api.java.JavaPairDStream.]] from input sources. The internal
+ * org.apache.spark.api.java.JavaSparkContext (see core Spark documentation) can be accessed
+ * using `context.sparkContext`. After creating and transforming DStreams, the streaming
+ * computation can be started and stopped using `context.start()` and `context.stop()`,
+ * respectively. `context.awaitTransformation()` allows the current thread to wait for the
+ * termination of a context by `stop()` or by an exception.
*/
class JavaStreamingContext(val ssc: StreamingContext) {
- // TODOs:
- // - Test to/from Hadoop functions
- // - Support creating and registering InputStreams
-
-
/**
- * Creates a StreamingContext.
+ * Create a StreamingContext.
* @param master Name of the Spark Master
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -60,7 +60,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, appName, batchDuration, null, Nil, Map()))
/**
- * Creates a StreamingContext.
+ * Create a StreamingContext.
* @param master Name of the Spark Master
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -77,7 +77,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
/**
- * Creates a StreamingContext.
+ * Create a StreamingContext.
* @param master Name of the Spark Master
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -94,7 +94,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, Map()))
/**
- * Creates a StreamingContext.
+ * Create a StreamingContext.
* @param master Name of the Spark Master
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
@@ -113,7 +113,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(master, appName, batchDuration, sparkHome, jars, environment))
/**
- * Creates a StreamingContext using an existing SparkContext.
+ * Create a JavaStreamingContext using an existing JavaSparkContext.
* @param sparkContext The underlying JavaSparkContext to use
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
@@ -121,7 +121,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(sparkContext.sc, batchDuration))
/**
- * Creates a StreamingContext using an existing SparkContext.
+ * Create a JavaStreamingContext using a SparkConf configuration.
* @param conf A Spark application configuration
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
@@ -129,19 +129,18 @@ class JavaStreamingContext(val ssc: StreamingContext) {
this(new StreamingContext(conf, batchDuration))
/**
- * Re-creates a StreamingContext from a checkpoint file.
+ * Recreate a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*/
def this(path: String) = this(new StreamingContext(path, new Configuration))
/**
- * Re-creates a StreamingContext from a checkpoint file.
+ * Re-creates a JavaStreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
*
*/
def this(path: String, hadoopConf: Configuration) = this(new StreamingContext(path, hadoopConf))
-
@deprecated("use sparkContext", "0.9.0")
val sc: JavaSparkContext = sparkContext
@@ -149,7 +148,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
val sparkContext = new JavaSparkContext(ssc.sc)
/**
- * Create a input stream from network source hostname:port. Data is received using
+ * Create an input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* @param hostname Hostname to connect to for receiving data
@@ -162,7 +161,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream from network source hostname:port. Data is received using
+ * Create an input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines. Storage level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
* @param hostname Hostname to connect to for receiving data
@@ -173,7 +172,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream from network source hostname:port. Data is received using
+ * Create an input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
@@ -195,7 +194,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream that monitors a Hadoop-compatible filesystem
+ * Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
@@ -207,7 +206,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream from network source hostname:port, where data is received
+ * Create an input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
@@ -226,7 +225,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream from network source hostname:port, where data is received
+ * Create an input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
@@ -241,7 +240,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Create a input stream that monitors a Hadoop-compatible filesystem
+ * Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
@@ -324,7 +323,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream from an queue of RDDs. In each batch,
+ * Creates an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -340,7 +339,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream from an queue of RDDs. In each batch,
+ * Creates an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
@@ -357,7 +356,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * Creates a input stream from an queue of RDDs. In each batch,
+ * Creates an input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 71a4c5c93e..6bff56a9d3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -37,8 +37,9 @@ import org.apache.spark.streaming.Duration
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see
* org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
- * DStreams can either be created from live data (such as, data from Kafka, Flume, sockets, HDFS)
- * or it can be generated by transforming existing DStreams using operations such as `map`,
+ * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
+ * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
+ * transforming existing DStreams using operations such as `map`,
* `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
* periodically generates a RDD, either from live data or by transforming the RDD generated by a
* parent DStream.
@@ -540,7 +541,6 @@ abstract class DStream[T: ClassTag] (
* on each RDD of 'this' DStream.
*/
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
- //new TransformedDStream(this, context.sparkContext.clean(transformFunc))
val cleanedF = context.sparkContext.clean(transformFunc)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index f57762321c..fb9df2f48e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -18,20 +18,17 @@
package org.apache.spark.streaming.dstream
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream._
import org.apache.spark.{Partitioner, HashPartitioner}
import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.{ClassTags, RDD, PairRDDFunctions}
-import org.apache.spark.storage.StorageLevel
+import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer
-import scala.reflect.{ClassTag, classTag}
+import scala.reflect.ClassTag
-import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.{Time, Duration}
@@ -108,7 +105,7 @@ extends Serializable {
/**
* Combine elements of each key in DStream's RDDs using custom functions. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in
- * [[org.apache.spark.rdd.PairRDDFunctions]] for more information.
+ * org.apache.spark.rdd.PairRDDFunctions in the Spark core documentation for more information.
*/
def combineByKey[C: ClassTag](
createCombiner: V => C,
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
index fdf5371a89..79ed696814 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receivers/ActorReceiver.scala
@@ -44,40 +44,49 @@ object ReceiverSupervisorStrategy {
/**
* A receiver trait to be mixed in with your Actor to gain access to
- * pushBlock API.
+ * the API for pushing received data into Spark Streaming for being processed.
*
* Find more details at: http://spark-project.org/docs/latest/streaming-custom-receivers.html
*
* @example {{{
* class MyActor extends Actor with Receiver{
* def receive {
- * case anything :String => pushBlock(anything)
+ * case anything: String => pushBlock(anything)
* }
* }
- * //Can be plugged in actorStream as follows
+ *
+ * // Can be used with an actorStream as follows
* ssc.actorStream[String](Props(new MyActor),"MyActorReceiver")
*
* }}}
*
- * @note An important point to note:
- * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * @note Since Actor may exist outside the spark framework, It is thus user's responsibility
* to ensure the type safety, i.e parametrized type of push block and InputDStream
* should be same.
- *
*/
-trait Receiver { self: Actor ⇒
+trait Receiver {
+
+ self: Actor ⇒ // to ensure that this can be added to Actor classes only
+
+ /**
+ * Push an iterator received data into Spark Streaming for processing
+ */
def pushBlock[T: ClassTag](iter: Iterator[T]) {
context.parent ! Data(iter)
}
+ /**
+ * Push a single item of received data into Spark Streaming for processing
+ */
def pushBlock[T: ClassTag](data: T) {
context.parent ! Data(data)
}
-
}
/**
- * Statistics for querying the supervisor about state of workers
+ * Statistics for querying the supervisor about state of workers. Used in
+ * conjunction with `StreamingContext.actorStream` and
+ * [[org.apache.spark.streaming.receivers.Receiver]].
*/
case class Statistics(numberOfMsgs: Int,
numberOfWorkers: Int,
@@ -96,17 +105,15 @@ private[streaming] case class Data[T: ClassTag](data: T)
* his own Actor to run as receiver for Spark Streaming input source.
*
* This starts a supervisor actor which starts workers and also provides
- * [http://doc.akka.io/docs/akka/2.0.5/scala/fault-tolerance.html fault-tolerance].
+ * [http://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.html fault-tolerance].
*
- * Here's a way to start more supervisor/workers as its children.
+ * Here's a way to start more supervisor/workers as its children.
*
* @example {{{
* context.parent ! Props(new Supervisor)
* }}} OR {{{
- * context.parent ! Props(new Worker,"Worker")
+ * context.parent ! Props(new Worker, "Worker")
* }}}
- *
- *
*/
private[streaming] class ActorReceiver[T: ClassTag](
props: Props,
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 738ff986d8..1ac61124cb 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -532,15 +532,15 @@ private[yarn] class YarnAllocationHandler(
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryResource = Records.newRecord(classOf[Resource])
- memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val resource = Resource.newInstance(memoryRequest, workerCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
for (i <- 0 until numWorkers) {
- requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting)
+ requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
}