aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--assembly/pom.xml24
-rw-r--r--assembly/src/main/assembly/assembly.xml2
-rw-r--r--conf/metrics.properties.template69
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/resources/spark/ui/static/webui.css28
-rw-r--r--core/src/main/scala/spark/Cache.scala80
-rw-r--r--core/src/main/scala/spark/Partitioner.scala14
-rw-r--r--core/src/main/scala/spark/SoftReferenceCache.scala35
-rw-r--r--core/src/main/scala/spark/SparkContext.scala42
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala15
-rw-r--r--core/src/main/scala/spark/Utils.scala55
-rw-r--r--core/src/main/scala/spark/api/python/PythonPartitioner.scala25
-rw-r--r--core/src/main/scala/spark/api/python/PythonWorkerFactory.scala4
-rw-r--r--core/src/main/scala/spark/broadcast/HttpBroadcast.scala36
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala152
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala11
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala18
-rw-r--r--core/src/main/scala/spark/deploy/master/ApplicationInfo.scala7
-rw-r--r--core/src/main/scala/spark/deploy/master/ApplicationSource.scala24
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala85
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala22
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala1
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala10
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala28
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala23
-rw-r--r--core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala17
-rw-r--r--core/src/main/scala/spark/io/CompressionCodec.scala82
-rw-r--r--core/src/main/scala/spark/metrics/MetricsConfig.scala17
-rw-r--r--core/src/main/scala/spark/metrics/MetricsSystem.scala26
-rw-r--r--core/src/main/scala/spark/metrics/sink/ConsoleSink.scala17
-rw-r--r--core/src/main/scala/spark/metrics/sink/CsvSink.scala17
-rw-r--r--core/src/main/scala/spark/metrics/sink/JmxSink.scala17
-rw-r--r--core/src/main/scala/spark/metrics/sink/Sink.scala17
-rw-r--r--core/src/main/scala/spark/metrics/source/JvmSource.scala17
-rw-r--r--core/src/main/scala/spark/metrics/source/Source.scala17
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala1
-rw-r--r--core/src/main/scala/spark/rdd/NewHadoopRDD.scala1
-rw-r--r--core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala63
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala13
-rw-r--r--core/src/main/scala/spark/scheduler/TaskResult.scala19
-rw-r--r--core/src/main/scala/spark/scheduler/TaskScheduler.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala25
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/Schedulable.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala19
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala61
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala5
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala20
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala27
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala17
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala18
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMaster.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMessages.scala163
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerSource.scala2
-rw-r--r--core/src/main/scala/spark/storage/BlockMessage.scala1
-rw-r--r--core/src/main/scala/spark/storage/BlockMessageArray.scala5
-rw-r--r--core/src/main/scala/spark/storage/BlockObjectWriter.scala2
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala1
-rw-r--r--core/src/main/scala/spark/ui/UIUtils.scala22
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala44
-rw-r--r--core/src/main/scala/spark/ui/env/EnvironmentUI.scala8
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala3
-rw-r--r--core/src/main/scala/spark/ui/jobs/IndexPage.scala132
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressListener.scala167
-rw-r--r--core/src/main/scala/spark/ui/jobs/JobProgressUI.scala122
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolPage.scala30
-rw-r--r--core/src/main/scala/spark/ui/jobs/PoolTable.scala49
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala116
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala5
-rw-r--r--core/src/main/scala/spark/util/Vector.scala5
-rw-r--r--core/src/test/scala/spark/KryoSerializerSuite.scala43
-rw-r--r--core/src/test/scala/spark/io/CompressionCodecSuite.scala62
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala6
-rw-r--r--core/src/test/scala/spark/scheduler/JobLoggerSuite.scala2
-rw-r--r--core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala2
-rw-r--r--docs/configuration.md29
-rw-r--r--docs/spark-standalone.md2
-rw-r--r--examples/src/main/java/spark/examples/JavaPageRank.java116
-rwxr-xr-xmake-distribution.sh1
-rw-r--r--mllib/pom.xml165
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala (renamed from mllib/src/main/scala/spark/mllib/classification/Classification.scala)0
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala34
-rw-r--r--mllib/src/main/scala/spark/mllib/classification/SVM.scala15
-rw-r--r--mllib/src/main/scala/spark/mllib/clustering/KMeans.scala9
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala15
-rw-r--r--mllib/src/main/scala/spark/mllib/optimization/Updater.scala32
-rw-r--r--mllib/src/main/scala/spark/mllib/recommendation/ALS.scala6
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/Lasso.scala15
-rw-r--r--mllib/src/main/scala/spark/mllib/regression/RegressionModel.scala (renamed from mllib/src/main/scala/spark/mllib/regression/Regression.scala)0
-rw-r--r--mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala80
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala113
-rw-r--r--mllib/src/main/scala/spark/mllib/util/MLUtils.scala4
-rw-r--r--mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala7
-rw-r--r--pom.xml6
-rw-r--r--project/SparkBuild.scala11
-rw-r--r--python/pyspark/rdd.py5
-rw-r--r--repl/src/main/scala/spark/repl/SparkILoop.scala4
-rwxr-xr-xrun5
-rw-r--r--streaming/src/main/scala/spark/streaming/Checkpoint.scala26
109 files changed, 2181 insertions, 962 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 1382539f24..cc5a4875af 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -62,43 +62,31 @@
<groupId>org.spark-project</groupId>
<artifactId>spark-core</artifactId>
<classifier>${classifier.name}</classifier>
- <version>0.8.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-bagel</artifactId>
<classifier>${classifier.name}</classifier>
- <version>0.8.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
- <artifactId>spark-examples</artifactId>
+ <artifactId>spark-mllib</artifactId>
<classifier>${classifier.name}</classifier>
- <version>0.8.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-examples</artifactId>
- <classifier>javadoc</classifier>
- <version>0.8.0-SNAPSHOT</version>
- </dependency>
- <dependency>
- <groupId>org.spark-project</groupId>
- <artifactId>spark-examples</artifactId>
- <classifier>sources</classifier>
- <version>0.8.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-repl</artifactId>
<classifier>${classifier.name}</classifier>
- <version>0.8.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project</groupId>
<artifactId>spark-streaming</artifactId>
<classifier>${classifier.name}</classifier>
- <version>0.8.0-SNAPSHOT</version>
+ <version>${project.version}</version>
</dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/assembly/src/main/assembly/assembly.xml b/assembly/src/main/assembly/assembly.xml
index dd05f35f1f..14485b7181 100644
--- a/assembly/src/main/assembly/assembly.xml
+++ b/assembly/src/main/assembly/assembly.xml
@@ -49,7 +49,7 @@
<include>org.spark-project:*:jar</include>
</includes>
<excludes>
- <exclude>org.spark-project:spark-dist:jar</exclude>
+ <exclude>org.spark-project:spark-assembly:jar</exclude>
</excludes>
</dependencySet>
<dependencySet>
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index 0486ca4c79..63a5a2093e 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -1,48 +1,45 @@
-# syntax: [instance].[sink|source].[name].[options]
-
-# "instance" specify "who" (the role) use metrics system. In spark there are
-# several roles like master, worker, executor, driver, these roles will
-# create metrics system for monitoring. So instance represents these roles.
-# Currently in Spark, several instances have already implemented: master,
-# worker, executor, driver.
-#
-# [instance] field can be "master", "worker", "executor", "driver", which means
-# only the specified instance has this property.
-# a wild card "*" can be used to represent instance name, which means all the
-# instances will have this property.
+# syntax: [instance].sink|source.[name].[options]=[value]
+
+# This file configures Spark's internal metrics system. The metrics system is
+# divided into instances which correspond to internal components.
+# Each instance can be configured to report its metrics to one or more sinks.
+# Accepted values for [instance] are "master", "worker", "executor", "driver",
+# and "applications". A wild card "*" can be used as an instance name, in
+# which case all instances will inherit the supplied property.
#
-# "source" specify "where" (source) to collect metrics data. In metrics system,
-# there exists two kinds of source:
-# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
-# collect Spark component's internal state, these sources are related to
-# instance and will be added after specific metrics system is created.
-# 2. Common source, like JvmSource, which will collect low level state, is
-# configured by configuration and loaded through reflection.
+# Within an instance, a "source" specifies a particular set of grouped metrics.
+# there are two kinds of sources:
+# 1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
+# collect a Spark component's internal state. Each instance is paired with a
+# Spark source that is added automatically.
+# 2. Common sources, like JvmSource, which will collect low level state.
+# These can be added through configuration options and are then loaded
+# using reflection.
#
-# "sink" specify "where" (destination) to output metrics data to. Several sinks
-# can be coexisted and flush metrics to all these sinks.
+# A "sink" specifies where metrics are delivered to. Each instance can be
+# assigned one or more sinks.
#
-# [sink|source] field specify this property is source related or sink, this
-# field can only be source or sink.
+# The sink|source field specifies whether the property relates to a sink or
+# source.
#
-# [name] field specify the name of source or sink, this is custom defined.
+# The [name] field specifies the name of source or sink.
#
-# [options] field is the specific property of this source or sink, this source
-# or sink is responsible for parsing this property.
+# The [options] field is the specific property of this source or sink. The
+# source or sink is responsible for parsing this property.
#
# Notes:
-# 1. Sinks should be added through configuration, like console sink, class
-# full name should be specified by class property.
-# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
-# it should be attention minimal polling period is 1 seconds, any period
-# below than 1s is illegal.
-# 3. Wild card property can be overlapped by specific instance property, for
-# example, *.sink.console.period can be overlapped by master.sink.console.period.
+# 1. To add a new sink, set the "class" option to a fully qualified class
+# name (see examples below).
+# 2. Some sinks involve a polling period. The minimum allowed polling period
+# is 1 second.
+# 3. Wild card properties can be overridden by more specific properties.
+# For example, master.sink.console.period takes precedence over
+# *.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
-# added to Java property using -Dspark.metrics.conf=xxx if you want to
-# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
-# metrics system will search and load it automatically.
+# added to Java properties using -Dspark.metrics.conf=xxx if you want to
+# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
+# and it will be loaded automatically.
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink
diff --git a/core/pom.xml b/core/pom.xml
index f0c936c86a..ba0071f582 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -49,6 +49,10 @@
<artifactId>compress-lzf</artifactId>
</dependency>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</dependency>
diff --git a/core/src/main/resources/spark/ui/static/webui.css b/core/src/main/resources/spark/ui/static/webui.css
index f7537bb766..fd2cbad004 100644
--- a/core/src/main/resources/spark/ui/static/webui.css
+++ b/core/src/main/resources/spark/ui/static/webui.css
@@ -47,3 +47,31 @@
padding-top: 7px;
padding-left: 4px;
}
+
+.table td {
+ vertical-align: middle !important;
+}
+
+.progress-completed .bar,
+.progress .bar-completed {
+ background-color: #b3def9;
+ background-image: -moz-linear-gradient(top, #addfff, #badcf2);
+ background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#addfff), to(#badcf2));
+ background-image: -webkit-linear-gradient(top, #addfff, #badcf2);
+ background-image: -o-linear-gradient(top, #addfff, #badcf2);
+ background-image: linear-gradient(to bottom, #addfff, #badcf2);
+ background-repeat: repeat-x;
+ filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffaddfff', endColorstr='#ffbadcf2', GradientType=0);
+}
+
+.progress-running .bar,
+.progress .bar-running {
+ background-color: #c2ebfa;
+ background-image: -moz-linear-gradient(top, #bdedff, #c7e8f5);
+ background-image: -webkit-gradient(linear, 0 0, 0 100%, from(#bdedff), to(#c7e8f5));
+ background-image: -webkit-linear-gradient(top, #bdedff, #c7e8f5);
+ background-image: -o-linear-gradient(top, #bdedff, #c7e8f5);
+ background-image: linear-gradient(to bottom, #bdedff, #c7e8f5);
+ background-repeat: repeat-x;
+ filter: progid:dximagetransform.microsoft.gradient(startColorstr='#ffbdedff', endColorstr='#ffc7e8f5', GradientType=0);
+}
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala
deleted file mode 100644
index b0c83ce59d..0000000000
--- a/core/src/main/scala/spark/Cache.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.concurrent.atomic.AtomicInteger
-
-private[spark] sealed trait CachePutResponse
-private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse
-private[spark] case class CachePutFailure() extends CachePutResponse
-
-/**
- * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store
- * both partitions of cached RDDs and broadcast variables on Spark executors. Caches are also aware
- * of which entries are part of the same dataset (for example, partitions in the same RDD). The key
- * for each value in a cache is a (datasetID, partition) pair.
- *
- * A single Cache instance gets created on each machine and is shared by all caches (i.e. both the
- * RDD split cache and the broadcast variable cache), to enable global replacement policies.
- * However, because these several independent modules all perform caching, it is important to give
- * them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use
- * the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first
- * ask for a KeySpace, and then call get() and put() on that space using its own keys.
- *
- * This abstract class handles the creation of key spaces, so that subclasses need only deal with
- * keys that are unique across modules.
- */
-private[spark] abstract class Cache {
- private val nextKeySpaceId = new AtomicInteger(0)
- private def newKeySpaceId() = nextKeySpaceId.getAndIncrement()
-
- def newKeySpace() = new KeySpace(this, newKeySpaceId())
-
- /**
- * Get the value for a given (datasetId, partition), or null if it is not
- * found.
- */
- def get(datasetId: Any, partition: Int): Any
-
- /**
- * Attempt to put a value in the cache; returns CachePutFailure if this was
- * not successful (e.g. because the cache replacement policy forbids it), and
- * CachePutSuccess if successful. If size estimation is available, the cache
- * implementation should set the size field in CachePutSuccess.
- */
- def put(datasetId: Any, partition: Int, value: Any): CachePutResponse
-
- /**
- * Report the capacity of the cache partition. By default this just reports
- * zero. Specific implementations can choose to provide the capacity number.
- */
- def getCapacity: Long = 0L
-}
-
-/**
- * A key namespace in a Cache.
- */
-private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) {
- def get(datasetId: Any, partition: Int): Any =
- cache.get((keySpaceId, datasetId), partition)
-
- def put(datasetId: Any, partition: Int, value: Any): CachePutResponse =
- cache.put((keySpaceId, datasetId), partition, value)
-
- def getCapacity: Long = cache.getCapacity
-}
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 660af70d52..6035bc075e 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -65,17 +65,9 @@ object Partitioner {
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions = partitions
- def getPartition(key: Any): Int = {
- if (key == null) {
- return 0
- } else {
- val mod = key.hashCode % partitions
- if (mod < 0) {
- mod + partitions
- } else {
- mod // Guard against negative hash codes
- }
- }
+ def getPartition(key: Any): Int = key match {
+ case null => 0
+ case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
override def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala
deleted file mode 100644
index f41a379582..0000000000
--- a/core/src/main/scala/spark/SoftReferenceCache.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import com.google.common.collect.MapMaker
-
-/**
- * An implementation of Cache that uses soft references.
- */
-private[spark] class SoftReferenceCache extends Cache {
- val map = new MapMaker().softValues().makeMap[Any, Any]()
-
- override def get(datasetId: Any, partition: Int): Any =
- map.get((datasetId, partition))
-
- override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
- map.put((datasetId, partition), value)
- return CachePutSuccess(0)
- }
-}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 77cb0ee0cd..40b30e4d23 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
+import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.util.DynamicVariable
import scala.collection.mutable.{ConcurrentMap, HashMap}
@@ -60,8 +61,10 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
-import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
+import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
+ SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
+ ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
@@ -125,6 +128,8 @@ class SparkContext(
private[spark] val ui = new SparkUI(this)
ui.bind()
+ val startTime = System.currentTimeMillis()
+
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach { addJar(_) }
@@ -262,12 +267,18 @@ class SparkContext(
localProperties.value = new Properties()
}
- def addLocalProperties(key: String, value: String) {
+ def addLocalProperty(key: String, value: String) {
if(localProperties.value == null) {
localProperties.value = new Properties()
}
localProperties.value.setProperty(key,value)
}
+
+ /** Set a human readable description of the current job. */
+ def setDescription(value: String) {
+ addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
+ }
+
// Post init
taskScheduler.postStartHook()
@@ -575,6 +586,28 @@ class SparkContext(
}
/**
+ * Return pools for fair scheduler
+ * TODO(xiajunluan): We should take nested pools into account
+ */
+ def getAllPools: ArrayBuffer[Schedulable] = {
+ taskScheduler.rootPool.schedulableQueue
+ }
+
+ /**
+ * Return the pool associated with the given name, if one exists
+ */
+ def getPoolForName(pool: String): Option[Schedulable] = {
+ taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
+ }
+
+ /**
+ * Return current scheduling mode
+ */
+ def getSchedulingMode: SchedulingMode.SchedulingMode = {
+ taskScheduler.schedulingMode
+ }
+
+ /**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
@@ -816,6 +849,7 @@ class SparkContext(
* various Spark features.
*/
object SparkContext {
+ val SPARK_JOB_DESCRIPTION = "spark.job.description"
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
@@ -933,7 +967,6 @@ object SparkContext {
}
}
-
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
@@ -945,3 +978,4 @@ private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable
+
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 4a1d341f5d..0adbf1d96e 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -97,13 +97,26 @@ class SparkEnv (
object SparkEnv extends Logging {
private val env = new ThreadLocal[SparkEnv]
+ @volatile private var lastSetSparkEnv : SparkEnv = _
def set(e: SparkEnv) {
+ lastSetSparkEnv = e
env.set(e)
}
+ /**
+ * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
+ * previously set in any thread.
+ */
def get: SparkEnv = {
- env.get()
+ Option(env.get()).getOrElse(lastSetSparkEnv)
+ }
+
+ /**
+ * Returns the ThreadLocal SparkEnv.
+ */
+ def getThreadLocal : SparkEnv = {
+ env.get()
}
def createFromSystemProperties(
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index e6a96a5ec1..673f9a810d 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -33,8 +33,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
-import spark.serializer.SerializerInstance
+import spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
import spark.deploy.SparkHadoopUtil
+import java.nio.ByteBuffer
/**
@@ -68,6 +69,47 @@ private object Utils extends Logging {
return ois.readObject.asInstanceOf[T]
}
+ /** Serialize via nested stream using specific serializer */
+ def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
+ val osWrapper = ser.serializeStream(new OutputStream {
+ def write(b: Int) = os.write(b)
+
+ override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
+ })
+ try {
+ f(osWrapper)
+ } finally {
+ osWrapper.close()
+ }
+ }
+
+ /** Deserialize via nested stream using specific serializer */
+ def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
+ val isWrapper = ser.deserializeStream(new InputStream {
+ def read(): Int = is.read()
+
+ override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
+ })
+ try {
+ f(isWrapper)
+ } finally {
+ isWrapper.close()
+ }
+ }
+
+ /**
+ * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
+ */
+ def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
+ if (bb.hasArray) {
+ out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
+ } else {
+ val bbval = new Array[Byte](bb.remaining())
+ bb.get(bbval)
+ out.write(bbval)
+ }
+ }
+
def isAlpha(c: Char): Boolean = {
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
}
@@ -596,7 +638,7 @@ private object Utils extends Logging {
output.toString
}
- /**
+ /**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
*/
@@ -756,4 +798,13 @@ private object Utils extends Logging {
}
return buf
}
+
+ /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
+ * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
+ * so function return (x % mod) + mod in that case.
+ */
+ def nonNegativeMod(x: Int, mod: Int): Int = {
+ val rawMod = x % mod
+ rawMod + (if (rawMod < 0) mod else 0)
+ }
}
diff --git a/core/src/main/scala/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
index 31a719fbff..ac112b8c2c 100644
--- a/core/src/main/scala/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/spark/api/python/PythonPartitioner.scala
@@ -18,7 +18,7 @@
package spark.api.python
import spark.Partitioner
-
+import spark.Utils
import java.util.Arrays
/**
@@ -35,25 +35,10 @@ private[spark] class PythonPartitioner(
val pyPartitionFunctionId: Long)
extends Partitioner {
- override def getPartition(key: Any): Int = {
- if (key == null) {
- return 0
- }
- else {
- val hashCode = {
- if (key.isInstanceOf[Array[Byte]]) {
- Arrays.hashCode(key.asInstanceOf[Array[Byte]])
- } else {
- key.hashCode()
- }
- }
- val mod = hashCode % numPartitions
- if (mod < 0) {
- mod + numPartitions
- } else {
- mod // Guard against negative hash codes
- }
- }
+ override def getPartition(key: Any): Int = key match {
+ case null => 0
+ case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
+ case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}
override def equals(other: Any): Boolean = other match {
diff --git a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
index 078ad45ce8..14f8320678 100644
--- a/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/spark/api/python/PythonWorkerFactory.scala
@@ -17,7 +17,7 @@
package spark.api.python
-import java.io.{DataInputStream, IOException}
+import java.io.{File, DataInputStream, IOException}
import java.net.{Socket, SocketException, InetAddress}
import scala.collection.JavaConversions._
@@ -67,6 +67,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
+ workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours
diff --git a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
index c565876950..138a8c21bc 100644
--- a/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/spark/broadcast/HttpBroadcast.scala
@@ -17,21 +17,20 @@
package spark.broadcast
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
-import java.io._
-import java.net._
-import java.util.UUID
+import java.io.{File, FileOutputStream, ObjectInputStream, OutputStream}
+import java.net.URL
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import spark._
+import spark.{HttpServer, Logging, SparkEnv, Utils}
+import spark.io.CompressionCodec
import spark.storage.StorageLevel
-import util.{MetadataCleaner, TimeStampedHashSet}
+import spark.util.{MetadataCleaner, TimeStampedHashSet}
+
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
-extends Broadcast[T](id) with Logging with Serializable {
+ extends Broadcast[T](id) with Logging with Serializable {
def value = value_
@@ -85,6 +84,7 @@ private object HttpBroadcast extends Logging {
private val files = new TimeStampedHashSet[String]
private val cleaner = new MetadataCleaner("HttpBroadcast", cleanup)
+ private lazy val compressionCodec = CompressionCodec.createCodec()
def initialize(isDriver: Boolean) {
synchronized {
@@ -122,10 +122,12 @@ private object HttpBroadcast extends Logging {
def write(id: Long, value: Any) {
val file = new File(broadcastDir, "broadcast-" + id)
- val out: OutputStream = if (compress) {
- new LZFOutputStream(new FileOutputStream(file)) // Does its own buffering
- } else {
- new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ val out: OutputStream = {
+ if (compress) {
+ compressionCodec.compressedOutputStream(new FileOutputStream(file))
+ } else {
+ new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
+ }
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
@@ -136,10 +138,12 @@ private object HttpBroadcast extends Logging {
def read[T](id: Long): T = {
val url = serverUri + "/broadcast-" + id
- var in = if (compress) {
- new LZFInputStream(new URL(url).openStream()) // Does its own buffering
- } else {
- new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ val in = {
+ if (compress) {
+ compressionCodec.compressedInputStream(new URL(url).openStream())
+ } else {
+ new FastBufferedInputStream(new URL(url).openStream(), bufferSize)
+ }
}
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index e1f8aff6f5..31861f3ac2 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -17,109 +17,113 @@
package spark.deploy
+import scala.collection.immutable.List
+
+import spark.Utils
import spark.deploy.ExecutorState.ExecutorState
import spark.deploy.master.{WorkerInfo, ApplicationInfo}
import spark.deploy.worker.ExecutorRunner
-import scala.collection.immutable.List
-import spark.Utils
-private[spark] sealed trait DeployMessage extends Serializable
+private[deploy] sealed trait DeployMessage extends Serializable
-// Worker to Master
+private[deploy] object DeployMessages {
-private[spark]
-case class RegisterWorker(
- id: String,
- host: String,
- port: Int,
- cores: Int,
- memory: Int,
- webUiPort: Int,
- publicAddress: String)
- extends DeployMessage {
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
-}
+ // Worker to Master
-private[spark]
-case class ExecutorStateChanged(
- appId: String,
- execId: Int,
- state: ExecutorState,
- message: Option[String],
- exitStatus: Option[Int])
- extends DeployMessage
+ case class RegisterWorker(
+ id: String,
+ host: String,
+ port: Int,
+ cores: Int,
+ memory: Int,
+ webUiPort: Int,
+ publicAddress: String)
+ extends DeployMessage {
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
-private[spark] case class Heartbeat(workerId: String) extends DeployMessage
+ case class ExecutorStateChanged(
+ appId: String,
+ execId: Int,
+ state: ExecutorState,
+ message: Option[String],
+ exitStatus: Option[Int])
+ extends DeployMessage
-// Master to Worker
+ case class Heartbeat(workerId: String) extends DeployMessage
-private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage
-private[spark] case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+ // Master to Worker
-private[spark] case class LaunchExecutor(
- appId: String,
- execId: Int,
- appDesc: ApplicationDescription,
- cores: Int,
- memory: Int,
- sparkHome: String)
- extends DeployMessage
+ case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
-// Client to Master
+ case class RegisterWorkerFailed(message: String) extends DeployMessage
-private[spark] case class RegisterApplication(appDescription: ApplicationDescription)
- extends DeployMessage
+ case class KillExecutor(appId: String, execId: Int) extends DeployMessage
-// Master to Client
+ case class LaunchExecutor(
+ appId: String,
+ execId: Int,
+ appDesc: ApplicationDescription,
+ cores: Int,
+ memory: Int,
+ sparkHome: String)
+ extends DeployMessage
-private[spark]
-case class RegisteredApplication(appId: String) extends DeployMessage
+ // Client to Master
-private[spark]
-case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
- Utils.checkHostPort(hostPort, "Required hostport")
-}
+ case class RegisterApplication(appDescription: ApplicationDescription)
+ extends DeployMessage
-private[spark]
-case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
- exitStatus: Option[Int])
+ // Master to Client
-private[spark]
-case class ApplicationRemoved(message: String)
+ case class RegisteredApplication(appId: String) extends DeployMessage
-// Internal message in Client
+ case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
+ Utils.checkHostPort(hostPort, "Required hostport")
+ }
-private[spark] case object StopClient
+ case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String],
+ exitStatus: Option[Int])
-// MasterWebUI To Master
+ case class ApplicationRemoved(message: String)
-private[spark] case object RequestMasterState
+ // Internal message in Client
-// Master to MasterWebUI
+ case object StopClient
-private[spark]
-case class MasterState(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+ // MasterWebUI To Master
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
+ case object RequestMasterState
- def uri = "spark://" + host + ":" + port
-}
+ // Master to MasterWebUI
+
+ case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+
+ def uri = "spark://" + host + ":" + port
+ }
+
+ // WorkerWebUI to Worker
+
+ case object RequestWorkerState
+
+ // Worker to WorkerWebUI
+
+ case class WorkerStateResponse(host: String, port: Int, workerId: String,
+ executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String,
+ cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
-// WorkerWebUI to Worker
-private[spark] case object RequestWorkerState
+ Utils.checkHost(host, "Required hostname")
+ assert (port > 0)
+ }
-// Worker to WorkerWebUI
+ // Actor System to Master
-private[spark]
-case class WorkerState(host: String, port: Int, workerId: String, executors: List[ExecutorRunner],
- finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
- coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) {
+ case object CheckForWorkerTimeOut
- Utils.checkHost(host, "Required hostname")
- assert (port > 0)
}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index 64f89623e1..bd1db7c294 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -17,9 +17,12 @@
package spark.deploy
-import master.{ApplicationInfo, WorkerInfo}
import net.liftweb.json.JsonDSL._
-import worker.ExecutorRunner
+
+import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
+import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import spark.deploy.worker.ExecutorRunner
+
private[spark] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo) = {
@@ -57,7 +60,7 @@ private[spark] object JsonProtocol {
("appdesc" -> writeApplicationDescription(obj.appDesc))
}
- def writeMasterState(obj: MasterState) = {
+ def writeMasterState(obj: MasterStateResponse) = {
("url" -> ("spark://" + obj.uri)) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
@@ -68,7 +71,7 @@ private[spark] object JsonProtocol {
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
}
- def writeWorkerState(obj: WorkerState) = {
+ def writeWorkerState(obj: WorkerStateResponse) = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
("masterwebuiurl" -> obj.masterWebUiUrl) ~
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index 29e494f495..9d5ba8a796 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -17,21 +17,23 @@
package spark.deploy.client
-import spark.deploy._
+import java.util.concurrent.TimeoutException
+
import akka.actor._
+import akka.actor.Terminated
import akka.pattern.ask
import akka.util.Duration
-import akka.util.duration._
-import java.util.concurrent.TimeoutException
-import spark.{SparkException, Logging}
+import akka.remote.RemoteClientDisconnected
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
-import spark.deploy.RegisterApplication
-import spark.deploy.master.Master
-import akka.remote.RemoteClientDisconnected
-import akka.actor.Terminated
import akka.dispatch.Await
+import spark.Logging
+import spark.deploy.{ApplicationDescription, ExecutorState}
+import spark.deploy.DeployMessages._
+import spark.deploy.master.Master
+
+
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
index 15ff919738..6dd2f06126 100644
--- a/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/ApplicationInfo.scala
@@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0
var endTime = -1L
+ val appSource = new ApplicationSource(this)
private var nextExecutorId = 0
@@ -51,8 +52,10 @@ private[spark] class ApplicationInfo(
}
def removeExecutor(exec: ExecutorInfo) {
- executors -= exec.id
- coresGranted -= exec.cores
+ if (executors.contains(exec.id)) {
+ executors -= exec.id
+ coresGranted -= exec.cores
+ }
}
def coresLeft: Int = desc.maxCores - coresGranted
diff --git a/core/src/main/scala/spark/deploy/master/ApplicationSource.scala b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
new file mode 100644
index 0000000000..4df2b6bfdd
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ApplicationSource.scala
@@ -0,0 +1,24 @@
+package spark.deploy.master
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import spark.metrics.source.Source
+
+class ApplicationSource(val application: ApplicationInfo) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "%s.%s.%s".format("application", application.desc.name,
+ System.currentTimeMillis())
+
+ metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
+ override def getValue: String = application.state.toString
+ })
+
+ metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
+ override def getValue: Long = application.duration
+ })
+
+ metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
+ override def getValue: Int = application.coresGranted
+ })
+
+}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 9692af5295..4a4d9908a0 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -17,27 +17,30 @@
package spark.deploy.master
-import akka.actor._
-import akka.actor.Terminated
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
-import akka.util.duration._
-
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import spark.deploy._
+import akka.actor._
+import akka.actor.Terminated
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.util.duration._
+
import spark.{Logging, SparkException, Utils}
+import spark.deploy.{ApplicationDescription, ExecutorState}
+import spark.deploy.DeployMessages._
+import spark.deploy.master.ui.MasterWebUI
import spark.metrics.MetricsSystem
import spark.util.AkkaUtils
-import ui.MasterWebUI
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
-
+ val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
+
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@@ -58,7 +61,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val metricsSystem = MetricsSystem.createMetricsSystem("master")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
val masterSource = new MasterSource(this)
val masterPublicAddress = {
@@ -76,15 +80,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
- context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
+ context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
- metricsSystem.registerSource(masterSource)
- metricsSystem.start()
+ masterMetricsSystem.registerSource(masterSource)
+ masterMetricsSystem.start()
+ applicationMetricsSystem.start()
}
override def postStop() {
webUi.stop()
- metricsSystem.stop()
+ masterMetricsSystem.stop()
+ applicationMetricsSystem.stop()
}
override def receive = {
@@ -168,7 +174,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterState(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ }
+
+ case CheckForWorkerTimeOut => {
+ timeOutDeadWorkers()
}
}
@@ -233,20 +243,27 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
- exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
+ worker.actor ! LaunchExecutor(
+ exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+ exec.application.driver ! ExecutorAdded(
+ exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
publicAddress: String): WorkerInfo = {
- // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them.
- workers.filter(w => (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)).foreach(workers -= _)
+ // There may be one or more refs to dead workers on this same node (w/ different ID's),
+ // remove them.
+ workers.filter { w =>
+ (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)
+ }.foreach { w =>
+ workers -= w
+ }
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
- return worker
+ worker
}
def removeWorker(worker: WorkerInfo) {
@@ -257,7 +274,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
addressToWorker -= worker.actor.path.address
for (exec <- worker.executors.values) {
logInfo("Telling app of lost executor: " + exec.id)
- exec.application.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None)
+ exec.application.driver ! ExecutorUpdated(
+ exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
}
@@ -266,6 +284,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val now = System.currentTimeMillis()
val date = new Date(now)
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
actorToApp(driver) = app
@@ -277,7 +296,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
- return app
+ app
}
def finishApplication(app: ApplicationInfo) {
@@ -291,7 +310,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
idToApp -= app.id
actorToApp -= app.driver
addressToApp -= app.driver.path.address
- completedApps += app // Remember it in our history
+ if (completedApps.size >= RETAINED_APPLICATIONS) {
+ val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
+ completedApps.take(toRemove).foreach( a => {
+ applicationMetricsSystem.removeSource(a.appSource)
+ })
+ completedApps.trimStart(toRemove)
+ }
+ completedApps += app // Remember it in our history
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
@@ -316,12 +342,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
/** Check for, and remove, any timed-out workers */
def timeOutDeadWorkers() {
// Copy the workers into an array so we don't modify the hashset while iterating through it
- val expirationTime = System.currentTimeMillis() - WORKER_TIMEOUT
- val toRemove = workers.filter(_.lastHeartbeat < expirationTime).toArray
+ val currentTime = System.currentTimeMillis()
+ val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT).toArray
for (worker <- toRemove) {
- logWarning("Removing %s because we got no heartbeat in %d seconds".format(
- worker.id, WORKER_TIMEOUT))
- removeWorker(worker)
+ if (worker.state != WorkerState.DEAD) {
+ logWarning("Removing %s because we got no heartbeat in %d seconds".format(
+ worker.id, WORKER_TIMEOUT/1000))
+ removeWorker(worker)
+ } else {
+ if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT))
+ workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
+ }
}
}
}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 32264af393..b4c62bc224 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,6 +17,8 @@
package spark.deploy.master.ui
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
@@ -25,9 +27,8 @@ import javax.servlet.http.HttpServletRequest
import net.liftweb.json.JsonAST.JValue
-import scala.xml.Node
-
-import spark.deploy.{RequestMasterState, JsonProtocol, MasterState}
+import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import spark.deploy.JsonProtocol
import spark.deploy.master.ExecutorInfo
import spark.ui.UIUtils
@@ -38,7 +39,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
@@ -49,7 +50,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index b05197c1b9..79fdb21024 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -17,26 +17,36 @@
package spark.deploy.master.ui
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.Node
+import net.liftweb.json.JsonAST.JValue
-import spark.deploy.{RequestMasterState, DeployWebUI, MasterState}
import spark.Utils
-import spark.ui.UIUtils
+import spark.deploy.DeployWebUI
+import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import spark.deploy.JsonProtocol
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import spark.ui.UIUtils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.master
implicit val timeout = parent.timeout
+ def renderJson(request: HttpServletRequest): JValue = {
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
+ val state = Await.result(stateFuture, 30 seconds)
+ JsonProtocol.writeMasterState(state)
+ }
+
/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState]
+ val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory")
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index dabc2d8dc7..31bdb7854e 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -61,6 +61,7 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
+ ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 47d3390928..345dfe879c 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -19,14 +19,12 @@ package spark.deploy.worker
import java.io._
import java.lang.System.getenv
-import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription}
+
import akka.actor.ActorRef
+
import spark.{Utils, Logging}
-import java.net.{URI, URL}
-import org.apache.hadoop.fs.{Path, FileSystem}
-import org.apache.hadoop.conf.Configuration
-import scala.Some
-import spark.deploy.ExecutorStateChanged
+import spark.deploy.{ExecutorState, ApplicationDescription}
+import spark.deploy.DeployMessages.ExecutorStateChanged
/**
* Manages the execution of one executor process.
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 8fa0d12b82..0e46fa281e 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -17,22 +17,24 @@
package spark.deploy.worker
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import java.text.SimpleDateFormat
+import java.util.Date
+import java.io.File
+
+import scala.collection.mutable.HashMap
+
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
+
import spark.{Logging, Utils}
-import spark.util.AkkaUtils
-import spark.deploy._
-import spark.metrics.MetricsSystem
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.text.SimpleDateFormat
-import java.util.Date
-import spark.deploy.RegisterWorker
-import spark.deploy.LaunchExecutor
-import spark.deploy.RegisterWorkerFailed
+import spark.deploy.ExecutorState
+import spark.deploy.DeployMessages._
import spark.deploy.master.Master
-import java.io.File
-import ui.WorkerWebUI
+import spark.deploy.worker.ui.WorkerWebUI
+import spark.metrics.MetricsSystem
+import spark.util.AkkaUtils
+
private[spark] class Worker(
host: String,
@@ -164,7 +166,7 @@ private[spark] class Worker(
masterDisconnected()
case RequestWorkerState => {
- sender ! WorkerState(host, port, workerId, executors.values.toList,
+ sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
finishedExecutors.values.toList, masterUrl, cores, memory,
coresUsed, memoryUsed, masterWebUiUrl)
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index 7548a26c2e..1619c6a4c2 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -17,34 +17,36 @@
package spark.deploy.worker.ui
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import javax.servlet.http.HttpServletRequest
-
import net.liftweb.json.JsonAST.JValue
-import scala.xml.Node
-
-import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState}
-import spark.deploy.worker.ExecutorRunner
import spark.Utils
+import spark.deploy.JsonProtocol
+import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
+import spark.deploy.worker.ExecutorRunner
import spark.ui.UIUtils
+
private[spark] class IndexPage(parent: WorkerWebUI) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
@@ -69,7 +71,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
</div>
- <hr/>
+ <hr/>
<div class="row"> <!-- Running Executors -->
<div class="span12">
@@ -88,7 +90,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</div>
</div>;
- UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
+ UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(
+ workerState.host, workerState.port))
}
def executorRow(executor: ExecutorRunner): Seq[Node] = {
diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
index f4003da732..e47fe50021 100644
--- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala
@@ -18,19 +18,16 @@
package spark.executor
import java.nio.ByteBuffer
-import spark.Logging
-import spark.TaskState.TaskState
-import spark.util.AkkaUtils
+
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue}
-import spark.scheduler.cluster._
-import spark.scheduler.cluster.RegisteredExecutor
-import spark.scheduler.cluster.LaunchTask
-import spark.scheduler.cluster.RegisterExecutorFailed
-import spark.scheduler.cluster.RegisterExecutor
-import spark.Utils
+
+import spark.{Logging, Utils}
+import spark.TaskState.TaskState
import spark.deploy.SparkHadoopUtil
+import spark.scheduler.cluster.StandaloneClusterMessages._
+import spark.util.AkkaUtils
+
private[spark] class StandaloneExecutorBackend(
driverUrl: String,
diff --git a/core/src/main/scala/spark/io/CompressionCodec.scala b/core/src/main/scala/spark/io/CompressionCodec.scala
new file mode 100644
index 0000000000..0adebecadb
--- /dev/null
+++ b/core/src/main/scala/spark/io/CompressionCodec.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{InputStream, OutputStream}
+
+import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
+
+import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+
+
+/**
+ * CompressionCodec allows the customization of choosing different compression implementations
+ * to be used in block storage.
+ */
+trait CompressionCodec {
+
+ def compressedOutputStream(s: OutputStream): OutputStream
+
+ def compressedInputStream(s: InputStream): InputStream
+}
+
+
+private[spark] object CompressionCodec {
+
+ def createCodec(): CompressionCodec = {
+ // Set the default codec to Snappy since the LZF implementation initializes a pretty large
+ // buffer for every stream, which results in a lot of memory overhead when the number of
+ // shuffle reduce buckets are large.
+ createCodec(classOf[SnappyCompressionCodec].getName)
+ }
+
+ def createCodec(codecName: String): CompressionCodec = {
+ Class.forName(
+ System.getProperty("spark.io.compression.codec", codecName),
+ true,
+ Thread.currentThread.getContextClassLoader).newInstance().asInstanceOf[CompressionCodec]
+ }
+}
+
+
+/**
+ * LZF implementation of [[spark.io.CompressionCodec]].
+ */
+class LZFCompressionCodec extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ new LZFOutputStream(s).setFinishBlockOnFlush(true)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new LZFInputStream(s)
+}
+
+
+/**
+ * Snappy implementation of [[spark.io.CompressionCodec]].
+ * Block size can be configured by spark.io.compression.snappy.block.size.
+ */
+class SnappyCompressionCodec extends CompressionCodec {
+
+ override def compressedOutputStream(s: OutputStream): OutputStream = {
+ val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+ new SnappyOutputStream(s, blockSize)
+ }
+
+ override def compressedInputStream(s: InputStream): InputStream = new SnappyInputStream(s)
+}
diff --git a/core/src/main/scala/spark/metrics/MetricsConfig.scala b/core/src/main/scala/spark/metrics/MetricsConfig.scala
index ed505b0aa7..3e32e9c82f 100644
--- a/core/src/main/scala/spark/metrics/MetricsConfig.scala
+++ b/core/src/main/scala/spark/metrics/MetricsConfig.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics
import java.util.Properties
diff --git a/core/src/main/scala/spark/metrics/MetricsSystem.scala b/core/src/main/scala/spark/metrics/MetricsSystem.scala
index 2f87577ff3..1dacafa135 100644
--- a/core/src/main/scala/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/spark/metrics/MetricsSystem.scala
@@ -1,6 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics
-import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
+import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
@@ -76,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
}
}
+ def removeSource(source: Source) {
+ sources -= source
+ registry.removeMatching(new MetricFilter {
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
+ })
+ }
+
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
diff --git a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
index eaaac5d153..966ba37c20 100644
--- a/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/ConsoleSink.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics.sink
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
diff --git a/core/src/main/scala/spark/metrics/sink/CsvSink.scala b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
index aa5bff0d34..cb990afdef 100644
--- a/core/src/main/scala/spark/metrics/sink/CsvSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/CsvSink.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics.sink
import com.codahale.metrics.{CsvReporter, MetricRegistry}
diff --git a/core/src/main/scala/spark/metrics/sink/JmxSink.scala b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
index 6a40885b78..ee04544c0e 100644
--- a/core/src/main/scala/spark/metrics/sink/JmxSink.scala
+++ b/core/src/main/scala/spark/metrics/sink/JmxSink.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics.sink
import com.codahale.metrics.{JmxReporter, MetricRegistry}
diff --git a/core/src/main/scala/spark/metrics/sink/Sink.scala b/core/src/main/scala/spark/metrics/sink/Sink.scala
index 3ffdcbdaba..dad1a7f0fe 100644
--- a/core/src/main/scala/spark/metrics/sink/Sink.scala
+++ b/core/src/main/scala/spark/metrics/sink/Sink.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics.sink
trait Sink {
diff --git a/core/src/main/scala/spark/metrics/source/JvmSource.scala b/core/src/main/scala/spark/metrics/source/JvmSource.scala
index 79f505079c..e771008557 100644
--- a/core/src/main/scala/spark/metrics/source/JvmSource.scala
+++ b/core/src/main/scala/spark/metrics/source/JvmSource.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics.source
import com.codahale.metrics.MetricRegistry
diff --git a/core/src/main/scala/spark/metrics/source/Source.scala b/core/src/main/scala/spark/metrics/source/Source.scala
index 5607e2c40a..76199a004b 100644
--- a/core/src/main/scala/spark/metrics/source/Source.scala
+++ b/core/src/main/scala/spark/metrics/source/Source.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark.metrics.source
import com.codahale.metrics.MetricRegistry
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index d0fdeb741e..fd00d59c77 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -88,6 +88,7 @@ class HadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
+ logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
val conf = confBroadcast.value.value
diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
index 17fe805fd4..0b71608169 100644
--- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala
@@ -73,6 +73,7 @@ class NewHadoopRDD[K, V](
override def compute(theSplit: Partition, context: TaskContext) = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
+ logInfo("Input split: " + split.serializableHadoopSplit)
val conf = confBroadcast.value.value
val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
diff --git a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
index 16ba0c26f8..33079cd539 100644
--- a/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/spark/rdd/ParallelCollectionRDD.scala
@@ -20,13 +20,15 @@ package spark.rdd
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
-import spark.{RDD, TaskContext, SparkContext, Partition}
+import spark._
+import java.io._
+import scala.Serializable
private[spark] class ParallelCollectionPartition[T: ClassManifest](
- val rddId: Long,
- val slice: Int,
- values: Seq[T])
- extends Partition with Serializable {
+ var rddId: Long,
+ var slice: Int,
+ var values: Seq[T])
+ extends Partition with Serializable {
def iterator: Iterator[T] = values.iterator
@@ -37,15 +39,49 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
case _ => false
}
- override val index: Int = slice
+ override def index: Int = slice
+
+ @throws(classOf[IOException])
+ private def writeObject(out: ObjectOutputStream): Unit = {
+
+ val sfactory = SparkEnv.get.serializer
+
+ // Treat java serializer with default action rather than going thru serialization, to avoid a
+ // separate serialization header.
+
+ sfactory match {
+ case js: JavaSerializer => out.defaultWriteObject()
+ case _ =>
+ out.writeLong(rddId)
+ out.writeInt(slice)
+
+ val ser = sfactory.newInstance()
+ Utils.serializeViaNestedStream(out, ser)(_.writeObject(values))
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(in: ObjectInputStream): Unit = {
+
+ val sfactory = SparkEnv.get.serializer
+ sfactory match {
+ case js: JavaSerializer => in.defaultReadObject()
+ case _ =>
+ rddId = in.readLong()
+ slice = in.readInt()
+
+ val ser = sfactory.newInstance()
+ Utils.deserializeViaNestedStream(in, ser)(ds => values = ds.readObject())
+ }
+ }
}
private[spark] class ParallelCollectionRDD[T: ClassManifest](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int,
- locationPrefs: Map[Int,Seq[String]])
- extends RDD[T](sc, Nil) {
+ locationPrefs: Map[Int, Seq[String]])
+ extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
// instead.
@@ -82,16 +118,17 @@ private object ParallelCollectionRDD {
1
}
slice(new Range(
- r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
+ r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
}
case r: Range => {
(0 until numSlices).map(i => {
val start = ((i * r.length.toLong) / numSlices).toInt
- val end = (((i+1) * r.length.toLong) / numSlices).toInt
+ val end = (((i + 1) * r.length.toLong) / numSlices).toInt
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}).asInstanceOf[Seq[Seq[T]]]
}
- case nr: NumericRange[_] => { // For ranges of Long, Double, BigInteger, etc
+ case nr: NumericRange[_] => {
+ // For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
var r = nr
@@ -102,10 +139,10 @@ private object ParallelCollectionRDD {
slices
}
case _ => {
- val array = seq.toArray // To prevent O(n^2) operations for List etc
+ val array = seq.toArray // To prevent O(n^2) operations for List etc
(0 until numSlices).map(i => {
val start = ((i * array.length.toLong) / numSlices).toInt
- val end = (((i+1) * array.length.toLong) / numSlices).toInt
+ val end = (((i + 1) * array.length.toLong) / numSlices).toInt
array.slice(start, end).toSeq
})
}
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index 9b45fc2938..89c51a44c9 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -510,6 +510,12 @@ class DAGScheduler(
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
+ // must be run listener before possible NotSerializableException
+ // should be "StageSubmitted" first and then "JobEnded"
+ val properties = idToActiveJob(stage.priority).properties
+ sparkListeners.foreach(_.onStageSubmitted(
+ SparkListenerStageSubmitted(stage, tasks.size, properties)))
+
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
@@ -524,11 +530,9 @@ class DAGScheduler(
return
}
- sparkListeners.foreach(_.onStageSubmitted(SparkListenerStageSubmitted(stage, tasks.size)))
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
- val properties = idToActiveJob(stage.priority).properties
taskSched.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
if (!stage.submissionTime.isDefined) {
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index f7565b8c57..ad2efcec63 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import spark._
+import spark.SparkContext
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
@@ -62,7 +63,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
event match {
case SparkListenerJobStart(job, properties) =>
processJobStartEvent(job, properties)
- case SparkListenerStageSubmitted(stage, taskSize) =>
+ case SparkListenerStageSubmitted(stage, taskSize, properties) =>
processStageSubmittedEvent(stage, taskSize)
case StageCompleted(stageInfo) =>
processStageCompletedEvent(stageInfo)
@@ -317,8 +318,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
protected def recordJobProperties(jobID: Int, properties: Properties) {
if(properties != null) {
- val annotation = properties.getProperty("spark.job.annotation", "")
- jobLogInfo(jobID, annotation, false)
+ val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
+ jobLogInfo(jobID, description, false)
}
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 361b1e6b91..1ced6f9524 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -118,6 +118,7 @@ private[spark] class ResultTask[T, U](
out.write(bytes)
out.writeInt(partition)
out.writeInt(outputId)
+ out.writeLong(generation)
out.writeObject(split)
}
}
@@ -132,6 +133,7 @@ private[spark] class ResultTask[T, U](
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
val outputId = in.readInt()
+ generation = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 1c25605f75..e3bb6d1e60 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -18,16 +18,9 @@
package spark.scheduler
import java.io._
-import java.util.{HashMap => JHashMap}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-
-import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-
-import com.ning.compress.lzf.LZFInputStream
-import com.ning.compress.lzf.LZFOutputStream
+import scala.collection.mutable.HashMap
import spark._
import spark.executor.ShuffleWriteMetrics
@@ -109,11 +102,7 @@ private[spark] class ShuffleMapTask(
preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
}
- var split = if (rdd == null) {
- null
- } else {
- rdd.partitions(partition)
- }
+ var split = if (rdd == null) null else rdd.partitions(partition)
override def writeExternal(out: ObjectOutput) {
RDDCheckpointData.synchronized {
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 4eb7e4e6a5..2a09a956ad 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -25,7 +25,8 @@ import spark.executor.TaskMetrics
sealed trait SparkListenerEvents
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int) extends SparkListenerEvents
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
+ extends SparkListenerEvents
case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
@@ -34,10 +35,10 @@ case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends Spa
case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
taskMetrics: TaskMetrics) extends SparkListenerEvents
-case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
extends SparkListenerEvents
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
trait SparkListener {
@@ -45,7 +46,7 @@ trait SparkListener {
* Called when a stage is completed, with information on the completed stage
*/
def onStageCompleted(stageCompleted: StageCompleted) { }
-
+
/**
* Called when a stage is submitted
*/
@@ -65,12 +66,12 @@ trait SparkListener {
* Called when a job starts
*/
def onJobStart(jobStart: SparkListenerJobStart) { }
-
+
/**
* Called when a job ends
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
-
+
}
/**
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index dc0621ea7b..89793e0e82 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -21,6 +21,8 @@ import java.io._
import scala.collection.mutable.Map
import spark.executor.TaskMetrics
+import spark.{Utils, SparkEnv}
+import java.nio.ByteBuffer
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
@@ -30,7 +32,13 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
def this() = this(null.asInstanceOf[T], null, null)
override def writeExternal(out: ObjectOutput) {
- out.writeObject(value)
+
+ val objectSer = SparkEnv.get.serializer.newInstance()
+ val bb = objectSer.serialize(value)
+
+ out.writeInt(bb.remaining())
+ Utils.writeByteBuffer(bb, out)
+
out.writeInt(accumUpdates.size)
for ((key, value) <- accumUpdates) {
out.writeLong(key)
@@ -40,7 +48,14 @@ class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics:
}
override def readExternal(in: ObjectInput) {
- value = in.readObject().asInstanceOf[T]
+
+ val objectSer = SparkEnv.get.serializer.newInstance()
+
+ val blen = in.readInt()
+ val byteVal = new Array[Byte](blen)
+ in.readFully(byteVal)
+ value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+
val numUpdates = in.readInt
if (numUpdates == 0) {
accumUpdates = null
diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
index 5188308006..4943d58e25 100644
--- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
@@ -17,6 +17,8 @@
package spark.scheduler
+import spark.scheduler.cluster.Pool
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
@@ -25,6 +27,11 @@ package spark.scheduler
* the TaskSchedulerListener interface.
*/
private[spark] trait TaskScheduler {
+
+ def rootPool: Pool
+
+ def schedulingMode: SchedulingMode
+
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 7c10074dc7..96568e0d27 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.HashSet
import spark._
import spark.TaskState.TaskState
import spark.scheduler._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
import java.util.{TimerTask, Timer}
@@ -114,6 +115,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
+ // default scheduler is FIFO
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.cluster.schedulingmode", "FIFO"))
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
@@ -121,15 +125,13 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
def initialize(context: SchedulerBackend) {
backend = context
- //default scheduler is FIFO
- val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
- //temporarily set rootPool name to empty
- rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+ // temporarily set rootPool name to empty
+ rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
- case "FIFO" =>
+ case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
- case "FAIR" =>
+ case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
@@ -204,7 +206,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
- "check your cluster UI to ensure that workers are registered")
+ "check your cluster UI to ensure that workers are registered " +
+ "and have sufficient memory")
} else {
this.cancel()
}
@@ -270,10 +273,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
var launchedTask = false
val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
- for (manager <- sortedTaskSetQueue)
- {
- logInfo("parentName:%s,name:%s,runningTasks:%s".format(manager.parent.name, manager.name, manager.runningTasks))
+
+ for (manager <- sortedTaskSetQueue) {
+ logDebug("parentName:%s, name:%s, runningTasks:%s".format(
+ manager.parent.name, manager.name, manager.runningTasks))
}
+
for (manager <- sortedTaskSetQueue) {
// Split offers based on node local, rack local and off-rack tasks.
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index bbf234febd..7f855cd345 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -85,14 +85,15 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
// Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = 4
+ val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
- val ser = SparkEnv.get.closureSerializer.newInstance()
+ val env = SparkEnv.get
+ val ser = env.closureSerializer.newInstance()
val tasks = taskSet.tasks
val numTasks = tasks.length
@@ -107,9 +108,8 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
- var name = "TaskSet_" + taskSet.stageId.toString
+ var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null
-
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -535,6 +535,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
taskFinished(tid, state, serializedData)
@@ -697,18 +698,18 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
}
- // TODO: for now we just find Pool not TaskSetManager,
+ // TODO(xiajunluan): for now we just find Pool not TaskSetManager
// we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
override def addSchedulable(schedulable:Schedulable) {
- //nothing
+ // nothing
}
override def removeSchedulable(schedulable:Schedulable) {
- //nothing
+ // nothing
}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
diff --git a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
index f557b142c4..e77e8e4162 100644
--- a/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/Schedulable.scala
@@ -17,14 +17,18 @@
package spark.scheduler.cluster
-import scala.collection.mutable.ArrayBuffer
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import scala.collection.mutable.ArrayBuffer
/**
* An interface for schedulable entities.
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
var parent: Schedulable
+ // child queues
+ def schedulableQueue: ArrayBuffer[Schedulable]
+ def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
def runningTasks: Int
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
index 95554023c0..b2d089f31d 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulableBuilder.scala
@@ -41,10 +41,11 @@ private[spark] trait SchedulableBuilder {
def addTaskSetManager(manager: Schedulable, properties: Properties)
}
-private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
+ extends SchedulableBuilder with Logging {
override def buildPools() {
- //nothing
+ // nothing
}
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
@@ -52,7 +53,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
-private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends SchedulableBuilder with Logging {
+private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+ extends SchedulableBuilder with Logging {
val schedulerAllocFile = System.getProperty("spark.fairscheduler.allocation.file","unspecified")
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.cluster.fair.pool"
@@ -103,9 +105,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
}
}
- //finally create "default" pool
+ // finally create "default" pool
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
- val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+ val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
+ DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Create default pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
@@ -119,8 +122,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool) extends Schedula
poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
parentPool = rootPool.getSchedulableByName(poolName)
if (parentPool == null) {
- //we will create a new pool that user has configured in app instead of being defined in xml file
- parentPool = new Pool(poolName,DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
+ // we will create a new pool that user has configured in app
+ // instead of being defined in xml file
+ parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
+ DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(parentPool)
logInfo("Create pool with name:%s,schedulingMode:%s,minShare:%d,weight:%d".format(
poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
index 4b3e3e50e1..55cdf4791f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulingMode.scala
@@ -17,8 +17,13 @@
package spark.scheduler.cluster
-object SchedulingMode extends Enumeration("FAIR","FIFO"){
+/**
+ * "FAIR" and "FIFO" determines which policy is used
+ * to order tasks amongst a Schedulable's sub-queues
+ * "NONE" is used when the a Schedulable has no sub-queues.
+ */
+object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
type SchedulingMode = Value
- val FAIR,FIFO = Value
+ val FAIR,FIFO,NONE = Value
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
index ac9e5ef94d..05c29eb72f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -17,46 +17,47 @@
package spark.scheduler.cluster
-import spark.TaskState.TaskState
import java.nio.ByteBuffer
-import spark.util.SerializableBuffer
+
+import spark.TaskState.TaskState
import spark.Utils
+import spark.util.SerializableBuffer
+
private[spark] sealed trait StandaloneClusterMessage extends Serializable
-// Driver to executors
-private[spark]
-case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
+private[spark] object StandaloneClusterMessages {
-private[spark]
-case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
- extends StandaloneClusterMessage
+ // Driver to executors
+ case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
-private[spark]
-case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
+ case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
+ extends StandaloneClusterMessage
-// Executors to driver
-private[spark]
-case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
- extends StandaloneClusterMessage {
- Utils.checkHostPort(hostPort, "Expected host port")
-}
+ case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage
-private[spark]
-case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
- extends StandaloneClusterMessage
+ // Executors to driver
+ case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
+ extends StandaloneClusterMessage {
+ Utils.checkHostPort(hostPort, "Expected host port")
+ }
+
+ case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
+ data: SerializableBuffer) extends StandaloneClusterMessage
-private[spark]
-object StatusUpdate {
- /** Alternate factory method that takes a ByteBuffer directly for the data field */
- def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
- StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ object StatusUpdate {
+ /** Alternate factory method that takes a ByteBuffer directly for the data field */
+ def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer)
+ : StatusUpdate = {
+ StatusUpdate(executorId, taskId, state, new SerializableBuffer(data))
+ }
}
-}
-// Internal messages in driver
-private[spark] case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case object StopDriver extends StandaloneClusterMessage
+ // Internal messages in driver
+ case object ReviveOffers extends StandaloneClusterMessage
-private[spark] case class RemoveExecutor(executorId: String, reason: String)
- extends StandaloneClusterMessage
+ case object StopDriver extends StandaloneClusterMessage
+
+ case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage
+
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 03a64e0192..075a7cbf7e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -17,17 +17,18 @@
package spark.scheduler.cluster
+import java.util.concurrent.atomic.AtomicInteger
+
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
-import akka.util.duration._
+import akka.dispatch.Await
import akka.pattern.ask
+import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.util.Duration
import spark.{Utils, SparkException, Logging, TaskState}
-import akka.dispatch.Await
-import java.util.concurrent.atomic.AtomicInteger
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import spark.scheduler.cluster.StandaloneClusterMessages._
/**
* A standalone scheduler backend, which waits for standalone executors to connect to it through
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 7978a5df74..1a92a5ed6f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -23,7 +23,10 @@ import spark.TaskState.TaskState
import spark.scheduler.TaskSet
private[spark] trait TaskSetManager extends Schedulable {
-
+ def schedulableQueue = null
+
+ def schedulingMode = SchedulingMode.NONE
+
def taskSet: TaskSet
def slaveOffer(
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index edd83d4cb4..f274b1a767 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -29,6 +29,7 @@ import spark.TaskState.TaskState
import spark.executor.ExecutorURLClassLoader
import spark.scheduler._
import spark.scheduler.cluster._
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
import akka.actor._
/**
@@ -85,6 +86,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
+ val schedulingMode: SchedulingMode = SchedulingMode.withName(
+ System.getProperty("spark.cluster.schedulingmode", "FIFO"))
val activeTaskSets = new HashMap[String, TaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -92,15 +95,13 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var localActor: ActorRef = null
override def start() {
- //default scheduler is FIFO
- val schedulingMode = System.getProperty("spark.cluster.schedulingmode", "FIFO")
- //temporarily set rootPool name to empty
- rootPool = new Pool("", SchedulingMode.withName(schedulingMode), 0, 0)
+ // temporarily set rootPool name to empty
+ rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
- case "FIFO" =>
+ case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
- case "FAIR" =>
+ case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool)
}
}
@@ -168,7 +169,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// Set the Spark execution environment for the worker thread
SparkEnv.set(env)
val ser = SparkEnv.get.closureSerializer.newInstance()
- var attemptedTask: Option[Task[_]] = None
+ val objectSer = SparkEnv.get.serializer.newInstance()
+ var attemptedTask: Option[Task[_]] = None
val start = System.currentTimeMillis()
var taskStart: Long = 0
try {
@@ -192,9 +194,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
// executor does. This is useful to catch serialization errors early
// on in development (so when users move their local Spark programs
// to the cluster, they don't get surprised by serialization errors).
- val serResult = ser.serialize(result)
+ val serResult = objectSer.serialize(result)
deserializedTask.metrics.get.resultSize = serResult.limit()
- val resultToReturn = ser.deserialize[Any](serResult)
+ val resultToReturn = objectSer.deserialize[Any](serResult)
val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
ser.serialize(Accumulators.values))
val serviceTime = System.currentTimeMillis() - taskStart
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index b29740c886..c38eeb9e11 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -42,7 +42,8 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val taskInfos = new HashMap[Long, TaskInfo]
val numTasks = taskSet.tasks.size
var numFinished = 0
- val ser = SparkEnv.get.closureSerializer.newInstance()
+ val env = SparkEnv.get
+ val ser = env.closureSerializer.newInstance()
val copiesRunning = new Array[Int](numTasks)
val finished = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
@@ -63,11 +64,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def addSchedulable(schedulable: Schedulable): Unit = {
- //nothing
+ // nothing
}
override def removeSchedulable(schedulable: Schedulable): Unit = {
- //nothing
+ // nothing
}
override def getSchedulableByName(name: String): Schedulable = {
@@ -75,7 +76,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def executorLost(executorId: String, host: String): Unit = {
- //nothing
+ // nothing
}
override def checkSpeculatableTasks() = true
@@ -143,6 +144,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
}
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
taskEnded(tid, state, serializedData)
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 7bc6040544..6ebbb5ec9b 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -110,12 +110,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val runScript = new File(sparkHome, "run").getCanonicalPath
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
- val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -123,7 +117,26 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value)
.build())
}
- return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"),
+ System.getProperty("spark.driver.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ val runScript = new File(sparkHome, "run").getCanonicalPath
+ command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./run spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
+ basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
+ return command.build()
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index 75b8268b55..f6069a5775 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -89,7 +89,6 @@ private[spark] class MesosSchedulerBackend(
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
@@ -97,15 +96,23 @@ private[spark] class MesosSchedulerBackend(
.setValue(value)
.build())
}
+ val command = CommandInfo.newBuilder()
+ .setEnvironment(environment)
+ val uri = System.getProperty("spark.executor.uri")
+ if (uri == null) {
+ command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
+ } else {
+ // Grab everything to the first '.'. We'll use that and '*' to
+ // glob the directory "correctly".
+ val basename = uri.split('/').last.split('.').head
+ command.setValue("cd %s*; ./spark-executor".format(basename))
+ command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
+ }
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build()
- val command = CommandInfo.newBuilder()
- .setValue(execScript)
- .setEnvironment(environment)
- .build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index e4ffa57ad2..3a72474419 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -27,11 +27,10 @@ import akka.dispatch.{Await, Future}
import akka.util.Duration
import akka.util.duration._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
-
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import spark.{Logging, SparkEnv, SparkException, Utils}
+import spark.io.CompressionCodec
import spark.network._
import spark.serializer.Serializer
import spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
@@ -158,6 +157,13 @@ private[spark] class BlockManager(
val metadataCleaner = new MetadataCleaner("BlockManager", this.dropOldBlocks)
initialize()
+ // The compression codec to use. Note that the "lazy" val is necessary because we want to delay
+ // the initialization of the compression codec until it is first used. The reason is that a Spark
+ // program could be using a user-defined codec in a third party jar, which is loaded in
+ // Executor.updateDependencies. When the BlockManager is initialized, user level jars hasn't been
+ // loaded yet.
+ private lazy val compressionCodec: CompressionCodec = CompressionCodec.createCodec()
+
/**
* Construct a BlockManager with a memory limit set based on system properties.
*/
@@ -919,18 +925,14 @@ private[spark] class BlockManager(
* Wrap an output stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: OutputStream): OutputStream = {
- if (shouldCompress(blockId)) {
- (new LZFOutputStream(s)).setFinishBlockOnFlush(true)
- } else {
- s
- }
+ if (shouldCompress(blockId)) compressionCodec.compressedOutputStream(s) else s
}
/**
* Wrap an input stream for compression if block compression is enabled for its block type
*/
def wrapForCompression(blockId: String, s: InputStream): InputStream = {
- if (shouldCompress(blockId)) new LZFInputStream(s) else s
+ if (shouldCompress(blockId)) compressionCodec.compressedInputStream(s) else s
}
def dataSerialize(
diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
index 3186f7c85b..76128e8cff 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala
@@ -23,6 +23,7 @@ import akka.pattern.ask
import akka.util.Duration
import spark.{Logging, SparkException}
+import spark.storage.BlockManagerMessages._
private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 244000d952..011bb6b83d 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -29,6 +29,8 @@ import akka.util.Duration
import akka.util.duration._
import spark.{Logging, Utils, SparkException}
+import spark.storage.BlockManagerMessages._
+
/**
* BlockManagerMasterActor is an actor on the master node to track statuses of
diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
index 01de4ccb8f..9375a9ca54 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala
@@ -22,102 +22,89 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import akka.actor.ActorRef
-//////////////////////////////////////////////////////////////////////////////////
-// Messages from the master to slaves.
-//////////////////////////////////////////////////////////////////////////////////
-private[spark]
-sealed trait ToBlockManagerSlave
-
-// Remove a block from the slaves that have it. This can only be used to remove
-// blocks that the master knows about.
-private[spark]
-case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
-
-// Remove all blocks belonging to a specific RDD.
-private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
-
-
-//////////////////////////////////////////////////////////////////////////////////
-// Messages from slaves to the master.
-//////////////////////////////////////////////////////////////////////////////////
-private[spark]
-sealed trait ToBlockManagerMaster
-
-private[spark]
-case class RegisterBlockManager(
- blockManagerId: BlockManagerId,
- maxMemSize: Long,
- sender: ActorRef)
- extends ToBlockManagerMaster
-
-private[spark]
-case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
-
-private[spark]
-class UpdateBlockInfo(
- var blockManagerId: BlockManagerId,
- var blockId: String,
- var storageLevel: StorageLevel,
- var memSize: Long,
- var diskSize: Long)
- extends ToBlockManagerMaster
- with Externalizable {
-
- def this() = this(null, null, null, 0, 0) // For deserialization only
-
- override def writeExternal(out: ObjectOutput) {
- blockManagerId.writeExternal(out)
- out.writeUTF(blockId)
- storageLevel.writeExternal(out)
- out.writeLong(memSize)
- out.writeLong(diskSize)
+private[storage] object BlockManagerMessages {
+ //////////////////////////////////////////////////////////////////////////////////
+ // Messages from the master to slaves.
+ //////////////////////////////////////////////////////////////////////////////////
+ sealed trait ToBlockManagerSlave
+
+ // Remove a block from the slaves that have it. This can only be used to remove
+ // blocks that the master knows about.
+ case class RemoveBlock(blockId: String) extends ToBlockManagerSlave
+
+ // Remove all blocks belonging to a specific RDD.
+ case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave
+
+
+ //////////////////////////////////////////////////////////////////////////////////
+ // Messages from slaves to the master.
+ //////////////////////////////////////////////////////////////////////////////////
+ sealed trait ToBlockManagerMaster
+
+ case class RegisterBlockManager(
+ blockManagerId: BlockManagerId,
+ maxMemSize: Long,
+ sender: ActorRef)
+ extends ToBlockManagerMaster
+
+ case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+ class UpdateBlockInfo(
+ var blockManagerId: BlockManagerId,
+ var blockId: String,
+ var storageLevel: StorageLevel,
+ var memSize: Long,
+ var diskSize: Long)
+ extends ToBlockManagerMaster
+ with Externalizable {
+
+ def this() = this(null, null, null, 0, 0) // For deserialization only
+
+ override def writeExternal(out: ObjectOutput) {
+ blockManagerId.writeExternal(out)
+ out.writeUTF(blockId)
+ storageLevel.writeExternal(out)
+ out.writeLong(memSize)
+ out.writeLong(diskSize)
+ }
+
+ override def readExternal(in: ObjectInput) {
+ blockManagerId = BlockManagerId(in)
+ blockId = in.readUTF()
+ storageLevel = StorageLevel(in)
+ memSize = in.readLong()
+ diskSize = in.readLong()
+ }
}
- override def readExternal(in: ObjectInput) {
- blockManagerId = BlockManagerId(in)
- blockId = in.readUTF()
- storageLevel = StorageLevel(in)
- memSize = in.readLong()
- diskSize = in.readLong()
+ object UpdateBlockInfo {
+ def apply(blockManagerId: BlockManagerId,
+ blockId: String,
+ storageLevel: StorageLevel,
+ memSize: Long,
+ diskSize: Long): UpdateBlockInfo = {
+ new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
+ }
+
+ // For pattern-matching
+ def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
+ Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
+ }
}
-}
-private[spark]
-object UpdateBlockInfo {
- def apply(blockManagerId: BlockManagerId,
- blockId: String,
- storageLevel: StorageLevel,
- memSize: Long,
- diskSize: Long): UpdateBlockInfo = {
- new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize)
- }
+ case class GetLocations(blockId: String) extends ToBlockManagerMaster
- // For pattern-matching
- def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
- Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
- }
-}
+ case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
-private[spark]
-case class GetLocations(blockId: String) extends ToBlockManagerMaster
+ case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
-private[spark]
-case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
+ case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
-private[spark]
-case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
+ case object StopBlockManagerMaster extends ToBlockManagerMaster
-private[spark]
-case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
+ case object GetMemoryStatus extends ToBlockManagerMaster
-private[spark]
-case object StopBlockManagerMaster extends ToBlockManagerMaster
+ case object ExpireDeadHosts extends ToBlockManagerMaster
-private[spark]
-case object GetMemoryStatus extends ToBlockManagerMaster
-
-private[spark]
-case object ExpireDeadHosts extends ToBlockManagerMaster
-
-private[spark]
-case object GetStorageStatus extends ToBlockManagerMaster
+ case object GetStorageStatus extends ToBlockManagerMaster
+}
diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
index 45cffad810..6e5fb43732 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala
@@ -19,7 +19,7 @@ package spark.storage
import akka.actor.Actor
-import spark.{Logging, SparkException, Utils}
+import spark.storage.BlockManagerMessages._
/**
diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala
index 4faa715c94..2aecd1ea71 100644
--- a/core/src/main/scala/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala
@@ -3,7 +3,7 @@ package spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
-import spark.storage._
+
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
val metricRegistry = new MetricRegistry()
diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala
index ab72dbb62b..bcce26b7c1 100644
--- a/core/src/main/scala/spark/storage/BlockMessage.scala
+++ b/core/src/main/scala/spark/storage/BlockMessage.scala
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
-import spark._
import spark.network._
private[spark] case class GetBlock(id: String)
diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala
index b0229d6124..ee2fc167d5 100644
--- a/core/src/main/scala/spark/storage/BlockMessageArray.scala
+++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala
@@ -19,7 +19,6 @@ package spark.storage
import java.nio.ByteBuffer
-import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer
import spark._
@@ -113,7 +112,7 @@ private[spark] object BlockMessageArray {
def main(args: Array[String]) {
val blockMessages =
- (0 until 10).map(i => {
+ (0 until 10).map { i =>
if (i % 2 == 0) {
val buffer = ByteBuffer.allocate(100)
buffer.clear
@@ -121,7 +120,7 @@ private[spark] object BlockMessageArray {
} else {
BlockMessage.fromGetBlock(GetBlock(i.toString))
}
- })
+ }
val blockMessageArray = new BlockMessageArray(blockMessages)
println("Block message array created")
diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala
index 01ed6e8c1f..3812009ca1 100644
--- a/core/src/main/scala/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala
@@ -17,8 +17,6 @@
package spark.storage
-import java.nio.ByteBuffer
-
/**
* An interface for writing JVM objects to some underlying storage. This interface allows
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3495d653bd..3ebfe173b1 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -66,7 +66,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
override def close() {
if (initialized) {
objOut.close()
- bs.close()
channel = null
bs = null
objOut = null
diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala
index e33c80282a..226fe49aaf 100644
--- a/core/src/main/scala/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/spark/ui/UIUtils.scala
@@ -28,14 +28,14 @@ private[spark] object UIUtils {
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value)
: Seq[Node] = {
- val storage = page match {
- case Storage => <li class="active"><a href="/storage">Storage</a></li>
- case _ => <li><a href="/storage">Storage</a></li>
- }
val jobs = page match {
case Jobs => <li class="active"><a href="/stages">Jobs</a></li>
case _ => <li><a href="/stages">Jobs</a></li>
}
+ val storage = page match {
+ case Storage => <li class="active"><a href="/storage">Storage</a></li>
+ case _ => <li><a href="/storage">Storage</a></li>
+ }
val environment = page match {
case Environment => <li class="active"><a href="/environment">Environment</a></li>
case _ => <li><a href="/environment">Environment</a></li>
@@ -65,18 +65,14 @@ private[spark] object UIUtils {
<div class="navbar">
<div class="navbar-inner">
<div class="container">
- <div class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></div>
+ <a href="/" class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></a>
<ul class="nav">
- {storage}
{jobs}
+ {storage}
{environment}
{executors}
</ul>
- <ul id="infolist">
- <li>Application: <strong>{sc.appName}</strong></li>
- <li>Master: <strong>{sc.master}</strong></li>
- <li>Executors: <strong>{sc.getExecutorStorageStatus.size}</strong></li>
- </ul>
+ <p class="navbar-text pull-right">Application: <strong>{sc.appName}</strong></p>
</div>
</div>
</div>
@@ -117,9 +113,9 @@ private[spark] object UIUtils {
<img src="/static/spark_logo.png" />
</div>
<div class="span10">
- <h1 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
+ <h3 style="vertical-align: bottom; margin-top: 40px; display: inline-block;">
{title}
- </h1>
+ </h3>
</div>
</div>
{content}
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
index a80e2d7002..97ea644021 100644
--- a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -21,7 +21,8 @@ import scala.util.Random
import spark.SparkContext
import spark.SparkContext._
-
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
@@ -29,18 +30,29 @@ import spark.SparkContext._
*/
private[spark] object UIWorkloadGenerator {
val NUM_PARTITIONS = 100
- val INTER_JOB_WAIT_MS = 500
+ val INTER_JOB_WAIT_MS = 5000
def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("usage: ./run spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ System.exit(1)
+ }
val master = args(0)
+ val schedulingMode = SchedulingMode.withName(args(1))
val appName = "Spark UI Tester"
+
+ if (schedulingMode == SchedulingMode.FAIR) {
+ System.setProperty("spark.cluster.schedulingmode", "FAIR")
+ }
val sc = new SparkContext(master, appName)
- // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
- // but we pass it here anyways since it will be useful once we do.
- def setName(s: String) = {
- sc.addLocalProperties("spark.job.annotation", s)
+ def setProperties(s: String) = {
+ if(schedulingMode == SchedulingMode.FAIR) {
+ sc.addLocalProperty("spark.scheduler.cluster.fair.pool", s)
+ }
+ sc.addLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
}
+
val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
def nextFloat() = (new Random()).nextFloat()
@@ -73,14 +85,18 @@ private[spark] object UIWorkloadGenerator {
while (true) {
for ((desc, job) <- jobs) {
- try {
- setName(desc)
- job()
- println("Job funished: " + desc)
- } catch {
- case e: Exception =>
- println("Job Failed: " + desc)
- }
+ new Thread {
+ override def run() {
+ try {
+ setProperties(desc)
+ job()
+ println("Job funished: " + desc)
+ } catch {
+ case e: Exception =>
+ println("Job Failed: " + desc)
+ }
+ }
+ }.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}
diff --git a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
index 5ae7935ed4..e98302611e 100644
--- a/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/spark/ui/env/EnvironmentUI.scala
@@ -44,7 +44,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
("Java Home", Properties.javaHome),
("Scala Version", Properties.versionString),
("Scala Home", Properties.scalaHome)
- )
+ ).sorted
def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
def jvmTable = UIUtils.listingTable(Seq("Name", "Value"), jvmRow, jvmInformation)
@@ -53,8 +53,8 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
.filter{case (k, v) => k.contains("java.class.path")}
.headOption
.getOrElse("", "")
- val sparkProperties = properties.filter(_._1.startsWith("spark"))
- val otherProperties = properties.diff(sparkProperties :+ classPathProperty)
+ val sparkProperties = properties.filter(_._1.startsWith("spark")).sorted
+ val otherProperties = properties.diff(sparkProperties :+ classPathProperty).sorted
val propertyHeaders = Seq("Name", "Value")
def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr>
@@ -67,7 +67,7 @@ private[spark] class EnvironmentUI(sc: SparkContext) {
.map(e => (e, "System Classpath"))
val addedJars = sc.addedJars.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
val addedFiles = sc.addedFiles.iterator.toSeq.map{case (path, time) => (path, "Added By User")}
- val classPath = addedJars ++ addedFiles ++ classPathEntries
+ val classPath = (addedJars ++ addedFiles ++ classPathEntries).sorted
val classPathHeaders = Seq("Resource", "Source")
def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr>
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index b70153fd30..4be2bfa413 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -93,7 +93,8 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
- val activeTasks = listener.executorToTasksActive.getOrElse(a.toString, Seq[Long]()).size.toString
+ val activeTasks = listener.executorToTasksActive.get(a.toString).map(l => l.size)
+ .getOrElse(0).toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
index 646ae5ecbc..b611b0fe85 100644
--- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala
@@ -17,23 +17,18 @@
package spark.ui.jobs
-import java.util.Date
-
import javax.servlet.http.HttpServletRequest
-import scala.Some
import scala.xml.{NodeSeq, Node}
-import spark.scheduler.Stage
-import spark.storage.StorageLevel
+import spark.scheduler.cluster.SchedulingMode
import spark.ui.Page._
import spark.ui.UIUtils._
import spark.Utils
-/** Page showing list of all ongoing and recently finished stages */
+/** Page showing list of all ongoing and recently finished stages and pools*/
private[spark] class IndexPage(parent: JobProgressUI) {
def listener = parent.listener
- val dateFmt = parent.dateFmt
def render(request: HttpServletRequest): Seq[Node] = {
val activeStages = listener.activeStages.toSeq
@@ -46,29 +41,19 @@ private[spark] class IndexPage(parent: JobProgressUI) {
activeTime += t.timeRunning(now)
}
- /** Special table which merges two header cells. */
- def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>
- <th>Stage Id</th>
- <th>Origin</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th colspan="2">Tasks: Complete/Total</th>
- <th>Shuffle Read</th>
- <th>Shuffle Write</th>
- <th>Stored RDD</th>
- </thead>
- <tbody>
- {rows.map(r => makeRow(r))}
- </tbody>
- </table>
- }
+ val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+ val completedStagesTable = new StageTable(completedStages.sortBy(_.submissionTime).reverse, parent)
+ val failedStagesTable = new StageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
+ val poolTable = new PoolTable(listener.sc.getAllPools, listener)
val summary: NodeSeq =
<div>
<ul class="unstyled">
- <li>
+ <li>
+ <strong>Duration: </strong>
+ {parent.formatDuration(now - listener.sc.startTime)}
+ </li>
+ <li>
<strong>CPU time: </strong>
{parent.formatDuration(listener.totalTime + activeTime)}
</li>
@@ -84,80 +69,35 @@ private[spark] class IndexPage(parent: JobProgressUI) {
{Utils.memoryBytesToString(listener.totalShuffleWrite)}
</li>
}
+ <li>
+ <a href="#active"><strong>Active Stages:</strong></a>
+ {activeStages.size}
+ </li>
+ <li>
+ <a href="#completed"><strong>Completed Stages:</strong></a>
+ {completedStages.size}
+ </li>
+ <li>
+ <a href="#failed"><strong>Failed Stages:</strong></a>
+ {failedStages.size}
+ </li>
+ <li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
</ul>
</div>
- val activeStageTable: NodeSeq = stageTable(stageRow, activeStages)
- val completedStageTable = stageTable(stageRow, completedStages)
- val failedStageTable: NodeSeq = stageTable(stageRow, failedStages)
- val content = summary ++
- <h2>Active Stages</h2> ++ activeStageTable ++
- <h2>Completed Stages</h2> ++ completedStageTable ++
- <h2>Failed Stages</h2> ++ failedStageTable
+ val content = summary ++
+ {if (listener.sc.getSchedulingMode == SchedulingMode.FAIR) {
+ <h3>Pools</h3> ++ poolTable.toNodeSeq
+ } else {
+ Seq()
+ }} ++
+ <h3 id="active">Active Stages : {activeStages.size}</h3> ++
+ activeStagesTable.toNodeSeq++
+ <h3 id="completed">Completed Stages : {completedStages.size}</h3> ++
+ completedStagesTable.toNodeSeq++
+ <h3 id ="failed">Failed Stages : {failedStages.size}</h3> ++
+ failedStagesTable.toNodeSeq
headerSparkPage(content, parent.sc, "Spark Stages", Jobs)
}
-
- def getElapsedTime(submitted: Option[Long], completed: Long): String = {
- submitted match {
- case Some(t) => parent.formatDuration(completed - t)
- case _ => "Unknown"
- }
- }
-
- def makeProgressBar(completed: Int, total: Int): Seq[Node] = {
- val width=130
- val height=15
- val completeWidth = (completed.toDouble / total) * width
-
- <svg width={width.toString} height={height.toString}>
- <rect width={width.toString} height={height.toString}
- fill="white" stroke="rgb(51,51,51)" stroke-width="1" />
- <rect width={completeWidth.toString} height={height.toString}
- fill="rgb(0,136,204)" stroke="black" stroke-width="1" />
- </svg>
- }
-
-
- def stageRow(s: Stage): Seq[Node] = {
- val submissionTime = s.submissionTime match {
- case Some(t) => dateFmt.format(new Date(t))
- case None => "Unknown"
- }
-
- val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
- case 0 => ""
- case b => Utils.memoryBytesToString(b)
- }
- val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
- case 0 => ""
- case b => Utils.memoryBytesToString(b)
- }
-
- val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
- val totalTasks = s.numPartitions
-
- <tr>
- <td>{s.id}</td>
- <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a></td>
- <td>{submissionTime}</td>
- <td>{getElapsedTime(s.submissionTime,
- s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
- <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td>
- <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks}
- {listener.stageToTasksFailed.getOrElse(s.id, 0) match {
- case f if f > 0 => "(%s failed)".format(f)
- case _ =>
- }}
- </td>
- <td>{shuffleRead}</td>
- <td>{shuffleWrite}</td>
- <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) {
- <a href={"/storage/rdd?id=%s".format(s.rdd.id)}>
- {Option(s.rdd.name).getOrElse(s.rdd.id)}
- </a>
- }}
- </td>
- </tr>
- }
}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
new file mode 100644
index 0000000000..f22c4e39e3
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressListener.scala
@@ -0,0 +1,167 @@
+package spark.ui.jobs
+
+import scala.Seq
+import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
+
+import spark.{ExceptionFailure, SparkContext, Success, Utils}
+import spark.scheduler._
+import spark.scheduler.cluster.TaskInfo
+import spark.executor.TaskMetrics
+import collection.mutable
+
+private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener {
+ // How many stages to remember
+ val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
+ val DEFAULT_POOL_NAME = "default"
+
+ val stageToPool = new HashMap[Stage, String]()
+ val stageToDescription = new HashMap[Stage, String]()
+ val poolToActiveStages = new HashMap[String, HashSet[Stage]]()
+
+ val activeStages = HashSet[Stage]()
+ val completedStages = ListBuffer[Stage]()
+ val failedStages = ListBuffer[Stage]()
+
+ // Total metrics reflect metrics only for completed tasks
+ var totalTime = 0L
+ var totalShuffleRead = 0L
+ var totalShuffleWrite = 0L
+
+ val stageToTime = HashMap[Int, Long]()
+ val stageToShuffleRead = HashMap[Int, Long]()
+ val stageToShuffleWrite = HashMap[Int, Long]()
+ val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
+ val stageToTasksComplete = HashMap[Int, Int]()
+ val stageToTasksFailed = HashMap[Int, Int]()
+ val stageToTaskInfos =
+ HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
+
+ override def onJobStart(jobStart: SparkListenerJobStart) {}
+
+ override def onStageCompleted(stageCompleted: StageCompleted) = {
+ val stage = stageCompleted.stageInfo.stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ activeStages -= stage
+ completedStages += stage
+ trimIfNecessary(completedStages)
+ }
+
+ /** If stages is too large, remove and garbage collect old stages */
+ def trimIfNecessary(stages: ListBuffer[Stage]) {
+ if (stages.size > RETAINED_STAGES) {
+ val toRemove = RETAINED_STAGES / 10
+ stages.takeRight(toRemove).foreach( s => {
+ stageToTaskInfos.remove(s.id)
+ stageToTime.remove(s.id)
+ stageToShuffleRead.remove(s.id)
+ stageToShuffleWrite.remove(s.id)
+ stageToTasksActive.remove(s.id)
+ stageToTasksComplete.remove(s.id)
+ stageToTasksFailed.remove(s.id)
+ stageToPool.remove(s)
+ if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
+ })
+ stages.trimEnd(toRemove)
+ }
+ }
+
+ /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = {
+ val stage = stageSubmitted.stage
+ activeStages += stage
+
+ val poolName = Option(stageSubmitted.properties).map {
+ p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
+ }.getOrElse(DEFAULT_POOL_NAME)
+ stageToPool(stage) = poolName
+
+ val description = Option(stageSubmitted.properties).flatMap {
+ p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
+ }
+ description.map(d => stageToDescription(stage) = d)
+
+ val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
+ stages += stage
+ }
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) {
+ val sid = taskStart.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive += taskStart.taskInfo
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList += ((taskStart.taskInfo, None, None))
+ stageToTaskInfos(sid) = taskList
+ }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val sid = taskEnd.task.stageId
+ val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
+ tasksActive -= taskEnd.taskInfo
+ val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
+ taskEnd.reason match {
+ case e: ExceptionFailure =>
+ stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
+ (Some(e), e.metrics)
+ case _ =>
+ stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
+ (None, Option(taskEnd.taskMetrics))
+ }
+
+ stageToTime.getOrElseUpdate(sid, 0L)
+ val time = metrics.map(m => m.executorRunTime).getOrElse(0)
+ stageToTime(sid) += time
+ totalTime += time
+
+ stageToShuffleRead.getOrElseUpdate(sid, 0L)
+ val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
+ s.remoteBytesRead).getOrElse(0L)
+ stageToShuffleRead(sid) += shuffleRead
+ totalShuffleRead += shuffleRead
+
+ stageToShuffleWrite.getOrElseUpdate(sid, 0L)
+ val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
+ s.shuffleBytesWritten).getOrElse(0L)
+ stageToShuffleWrite(sid) += shuffleWrite
+ totalShuffleWrite += shuffleWrite
+
+ val taskList = stageToTaskInfos.getOrElse(
+ sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
+ taskList -= ((taskEnd.taskInfo, None, None))
+ taskList += ((taskEnd.taskInfo, metrics, failureInfo))
+ stageToTaskInfos(sid) = taskList
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ jobEnd match {
+ case end: SparkListenerJobEnd =>
+ end.jobResult match {
+ case JobFailed(ex, Some(stage)) =>
+ activeStages -= stage
+ poolToActiveStages(stageToPool(stage)) -= stage
+ failedStages += stage
+ trimIfNecessary(failedStages)
+ case _ =>
+ }
+ case _ =>
+ }
+ }
+
+ /** Is this stage's input from a shuffle read. */
+ def hasShuffleRead(stageID: Int): Boolean = {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
+ }
+
+ /** Is this stage's output to a shuffle write. */
+ def hasShuffleWrite(stageID: Int): Boolean = {
+ // This is written in a slightly complicated way to avoid having to scan all tasks
+ for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
+ if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
+ }
+ return false // No tasks have finished for this stage
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
index 09d24b6302..c83f102ff3 100644
--- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala
@@ -31,9 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.ui.JettyUtils._
import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._
-import spark.scheduler.cluster.TaskInfo
-import spark.executor.TaskMetrics
import collection.mutable
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) {
@@ -43,9 +43,10 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
+ private val poolPage = new PoolPage(this)
def start() {
- _listener = Some(new JobProgressListener)
+ _listener = Some(new JobProgressListener(sc))
sc.addSparkListener(listener)
}
@@ -53,120 +54,7 @@ private[spark] class JobProgressUI(val sc: SparkContext) {
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),
+ ("/stages/pool", (request: HttpServletRequest) => poolPage.render(request)),
("/stages", (request: HttpServletRequest) => indexPage.render(request))
)
}
-
-private[spark] class JobProgressListener extends SparkListener {
- // How many stages to remember
- val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt
-
- val activeStages = HashSet[Stage]()
- val completedStages = ListBuffer[Stage]()
- val failedStages = ListBuffer[Stage]()
-
- // Total metrics reflect metrics only for completed tasks
- var totalTime = 0L
- var totalShuffleRead = 0L
- var totalShuffleWrite = 0L
-
- val stageToTime = HashMap[Int, Long]()
- val stageToShuffleRead = HashMap[Int, Long]()
- val stageToShuffleWrite = HashMap[Int, Long]()
- val stageToTasksActive = HashMap[Int, HashSet[TaskInfo]]()
- val stageToTasksComplete = HashMap[Int, Int]()
- val stageToTasksFailed = HashMap[Int, Int]()
- val stageToTaskInfos =
- HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
-
- override def onJobStart(jobStart: SparkListenerJobStart) {}
-
- override def onStageCompleted(stageCompleted: StageCompleted) = {
- val stage = stageCompleted.stageInfo.stage
- activeStages -= stage
- completedStages += stage
- trimIfNecessary(completedStages)
- }
-
- /** If stages is too large, remove and garbage collect old stages */
- def trimIfNecessary(stages: ListBuffer[Stage]) {
- if (stages.size > RETAINED_STAGES) {
- val toRemove = RETAINED_STAGES / 10
- stages.takeRight(toRemove).foreach( s => {
- stageToTaskInfos.remove(s.id)
- stageToTime.remove(s.id)
- stageToShuffleRead.remove(s.id)
- stageToShuffleWrite.remove(s.id)
- stageToTasksActive.remove(s.id)
- stageToTasksComplete.remove(s.id)
- stageToTasksFailed.remove(s.id)
- })
- stages.trimEnd(toRemove)
- }
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
- activeStages += stageSubmitted.stage
-
- override def onTaskStart(taskStart: SparkListenerTaskStart) {
- val sid = taskStart.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive += taskStart.taskInfo
- val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList += ((taskStart.taskInfo, None, None))
- stageToTaskInfos(sid) = taskList
- }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val sid = taskEnd.task.stageId
- val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
- tasksActive -= taskEnd.taskInfo
- val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
- taskEnd.reason match {
- case e: ExceptionFailure =>
- stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
- (Some(e), e.metrics)
- case _ =>
- stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
- (None, Option(taskEnd.taskMetrics))
- }
-
- stageToTime.getOrElseUpdate(sid, 0L)
- val time = metrics.map(m => m.executorRunTime).getOrElse(0)
- stageToTime(sid) += time
- totalTime += time
-
- stageToShuffleRead.getOrElseUpdate(sid, 0L)
- val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
- s.remoteBytesRead).getOrElse(0L)
- stageToShuffleRead(sid) += shuffleRead
- totalShuffleRead += shuffleRead
-
- stageToShuffleWrite.getOrElseUpdate(sid, 0L)
- val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
- s.shuffleBytesWritten).getOrElse(0L)
- stageToShuffleWrite(sid) += shuffleWrite
- totalShuffleWrite += shuffleWrite
-
- val taskList = stageToTaskInfos.getOrElse(
- sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
- taskList -= ((taskEnd.taskInfo, None, None))
- taskList += ((taskEnd.taskInfo, metrics, failureInfo))
- stageToTaskInfos(sid) = taskList
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- jobEnd match {
- case end: SparkListenerJobEnd =>
- end.jobResult match {
- case JobFailed(ex, Some(stage)) =>
- activeStages -= stage
- failedStages += stage
- trimIfNecessary(failedStages)
- case _ =>
- }
- case _ =>
- }
- }
-}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
new file mode 100644
index 0000000000..647c6d2ae3
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/PoolPage.scala
@@ -0,0 +1,30 @@
+package spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+
+/** Page showing specific pool details */
+private[spark] class PoolPage(parent: JobProgressUI) {
+ def listener = parent.listener
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val poolName = request.getParameter("poolname")
+ val poolToActiveStages = listener.poolToActiveStages
+ val activeStages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]).toSeq
+ val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent)
+
+ val pool = listener.sc.getPoolForName(poolName).get
+ val poolTable = new PoolTable(Seq(pool), listener)
+
+ val content = <h3>Pool </h3> ++ poolTable.toNodeSeq() ++
+ <h3>Active Stages : {activeStages.size}</h3> ++ activeStagesTable.toNodeSeq()
+
+ headerSparkPage(content, parent.sc, "Spark Pool Details", Jobs)
+ }
+}
diff --git a/core/src/main/scala/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
new file mode 100644
index 0000000000..9cfe0d68f0
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/PoolTable.scala
@@ -0,0 +1,49 @@
+package spark.ui.jobs
+
+import scala.xml.Node
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.Stage
+import spark.scheduler.cluster.Schedulable
+
+/** Table showing list of pools */
+private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressListener) {
+
+ var poolToActiveStages: HashMap[String, HashSet[Stage]] = listener.poolToActiveStages
+
+ def toNodeSeq(): Seq[Node] = {
+ poolTable(poolRow, pools)
+ }
+
+ // pool tables
+ def poolTable(makeRow: (Schedulable, HashMap[String, HashSet[Stage]]) => Seq[Node],
+ rows: Seq[Schedulable]
+ ): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Pool Name</th>
+ <th>Minimum Share</th>
+ <th>Pool Weight</th>
+ <td>Active Stages</td>
+ <td>Running Tasks</td>
+ <td>SchedulingMode</td>
+ </thead>
+ <tbody>
+ {rows.map(r => makeRow(r, poolToActiveStages))}
+ </tbody>
+ </table>
+ }
+
+ def poolRow(p: Schedulable, poolToActiveStages: HashMap[String, HashSet[Stage]]): Seq[Node] = {
+ <tr>
+ <td><a href={"/stages/pool?poolname=%s".format(p.name)}>{p.name}</a></td>
+ <td>{p.minShare}</td>
+ <td>{p.weight}</td>
+ <td>{poolToActiveStages.getOrElseUpdate(p.name, new HashSet[Stage]()).size}</td>
+ <td>{p.runningTasks}</td>
+ <td>{p.schedulingMode}</td>
+ </tr>
+ }
+}
+
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
new file mode 100644
index 0000000000..1df0e0913c
--- /dev/null
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -0,0 +1,116 @@
+package spark.ui.jobs
+
+import java.util.Date
+import java.text.SimpleDateFormat
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.Some
+import scala.xml.{NodeSeq, Node}
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import spark.scheduler.cluster.{SchedulingMode, TaskInfo}
+import spark.scheduler.Stage
+import spark.ui.UIUtils._
+import spark.ui.Page._
+import spark.Utils
+import spark.storage.StorageLevel
+
+/** Page showing list of all ongoing and recently finished stages */
+private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressUI) {
+
+ val listener = parent.listener
+ val dateFmt = parent.dateFmt
+ val isFairScheduler = listener.sc.getSchedulingMode == SchedulingMode.FAIR
+
+ def toNodeSeq(): Seq[Node] = {
+ stageTable(stageRow, stages)
+ }
+
+ /** Special table which merges two header cells. */
+ def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = {
+ <table class="table table-bordered table-striped table-condensed sortable">
+ <thead>
+ <th>Stage Id</th>
+ {if (isFairScheduler) {<th>Pool Name</th>} else {}}
+ <th>Description</th>
+ <th>Submitted</th>
+ <td>Duration</td>
+ <td>Tasks: Succeeded/Total</td>
+ <td>Shuffle Read</td>
+ <td>Shuffle Write</td>
+ </thead>
+ <tbody>
+ {rows.map(r => makeRow(r))}
+ </tbody>
+ </table>
+ }
+
+ def getElapsedTime(submitted: Option[Long], completed: Long): String = {
+ submitted match {
+ case Some(t) => parent.formatDuration(completed - t)
+ case _ => "Unknown"
+ }
+ }
+
+ def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] = {
+ val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+ val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+
+ <div class="progress" style="height: 15px; margin-bottom: 0px; position: relative">
+ <span style="text-align:center; position:absolute; width:100%;">
+ {completed}/{total} {failed}
+ </span>
+ <div class="bar bar-completed" style={completeWidth}></div>
+ <div class="bar bar-running" style={startWidth}></div>
+ </div>
+ }
+
+
+ def stageRow(s: Stage): Seq[Node] = {
+ val submissionTime = s.submissionTime match {
+ case Some(t) => dateFmt.format(new Date(t))
+ case None => "Unknown"
+ }
+
+ val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
+ }
+ val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
+ case 0 => ""
+ case b => Utils.memoryBytesToString(b)
+ }
+
+ val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
+ val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0)
+ val failedTasks = listener.stageToTasksFailed.getOrElse(s.id, 0) match {
+ case f if f > 0 => "(%s failed)".format(f)
+ case _ => ""
+ }
+ val totalTasks = s.numPartitions
+
+ val poolName = listener.stageToPool.get(s)
+
+ val nameLink = <a href={"/stages/stage?id=%s".format(s.id)}>{s.name}</a>
+ val description = listener.stageToDescription.get(s)
+ .map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
+
+ <tr>
+ <td>{s.id}</td>
+ {if (isFairScheduler) {
+ <td><a href={"/stages/pool?poolname=%s".format(poolName.get)}>{poolName.get}</a></td>}
+ }
+ <td>{description}</td>
+ <td valign="middle">{submissionTime}</td>
+ <td>{getElapsedTime(s.submissionTime,
+ s.completionTime.getOrElse(System.currentTimeMillis()))}</td>
+ <td class="progress-cell">
+ {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+ </td>
+ <td>{shuffleRead}</td>
+ <td>{shuffleWrite}</td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 003be54ad8..cd828778a6 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -83,18 +83,19 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<hr/>
<div class="row">
<div class="span12">
+ <h3> Data Distribution Summary </h3>
{workerTable}
</div>
</div>
<hr/>
<div class="row">
<div class="span12">
- <h3> RDD Summary </h3>
+ <h3> Partitions </h3>
{blockTable}
</div>
</div>;
- headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs)
+ headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Storage)
}
def blockRow(row: (String, BlockStatus, Seq[String])): Seq[Node] = {
diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala
index ed49386f18..a47cac3b96 100644
--- a/core/src/main/scala/spark/util/Vector.scala
+++ b/core/src/main/scala/spark/util/Vector.scala
@@ -73,7 +73,6 @@ class Vector(val elements: Array[Double]) extends Serializable {
def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
- var ans = 0.0
var i = 0
while (i < length) {
elements(i) += other(i)
@@ -117,9 +116,7 @@ object Vector {
def apply(elements: Double*) = new Vector(elements.toArray)
def apply(length: Int, initializer: Int => Double): Vector = {
- val elements = new Array[Double](length)
- for (i <- 0 until length)
- elements(i) = initializer(i)
+ val elements: Array[Double] = Array.tabulate(length)(initializer)
return new Vector(elements)
}
diff --git a/core/src/test/scala/spark/KryoSerializerSuite.scala b/core/src/test/scala/spark/KryoSerializerSuite.scala
index 30d2d5282b..01390027c8 100644
--- a/core/src/test/scala/spark/KryoSerializerSuite.scala
+++ b/core/src/test/scala/spark/KryoSerializerSuite.scala
@@ -22,7 +22,9 @@ import scala.collection.mutable
import org.scalatest.FunSuite
import com.esotericsoftware.kryo._
-class KryoSerializerSuite extends FunSuite {
+import KryoTest._
+
+class KryoSerializerSuite extends FunSuite with SharedSparkContext {
test("basic types") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {
@@ -124,6 +126,45 @@ class KryoSerializerSuite extends FunSuite {
System.clearProperty("spark.kryo.registrator")
}
+
+ test("kryo with collect") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)).collect().map(_.x)
+ assert(control === result.toSeq)
+ }
+
+ test("kryo with parallelize") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control.map(new ClassWithoutNoArgConstructor(_))).map(_.x).collect()
+ assert (control === result.toSeq)
+ }
+
+ test("kryo with reduce") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
+ .reduce((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
+ assert(control.sum === result)
+ }
+
+ // TODO: this still doesn't work
+ ignore("kryo with fold") {
+ val control = 1 :: 2 :: Nil
+ val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
+ .fold(new ClassWithoutNoArgConstructor(10))((t1, t2) => new ClassWithoutNoArgConstructor(t1.x + t2.x)).x
+ assert(10 + control.sum === result)
+ }
+
+ override def beforeAll() {
+ System.setProperty("spark.serializer", "spark.KryoSerializer")
+ System.setProperty("spark.kryo.registrator", classOf[MyRegistrator].getName)
+ super.beforeAll()
+ }
+
+ override def afterAll() {
+ super.afterAll()
+ System.clearProperty("spark.kryo.registrator")
+ System.clearProperty("spark.serializer")
+ }
}
object KryoTest {
diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
new file mode 100644
index 0000000000..1ba82fe2b9
--- /dev/null
+++ b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.io
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
+
+import org.scalatest.FunSuite
+
+
+class CompressionCodecSuite extends FunSuite {
+
+ def testCodec(codec: CompressionCodec) {
+ // Write 1000 integers to the output stream, compressed.
+ val outputStream = new ByteArrayOutputStream()
+ val out = codec.compressedOutputStream(outputStream)
+ for (i <- 1 until 1000) {
+ out.write(i % 256)
+ }
+ out.close()
+
+ // Read the 1000 integers back.
+ val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
+ val in = codec.compressedInputStream(inputStream)
+ for (i <- 1 until 1000) {
+ assert(in.read() === i % 256)
+ }
+ in.close()
+ }
+
+ test("default compression codec") {
+ val codec = CompressionCodec.createCodec()
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("lzf compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
+ assert(codec.getClass === classOf[LZFCompressionCodec])
+ testCodec(codec)
+ }
+
+ test("snappy compression codec") {
+ val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
+ assert(codec.getClass === classOf[SnappyCompressionCodec])
+ testCodec(codec)
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index a8b88d7936..caaf3209fd 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -32,6 +32,10 @@ import spark.{Dependency, ShuffleDependency, OneToOneDependency}
import spark.{FetchFailed, Success, TaskEndReason}
import spark.storage.{BlockManagerId, BlockManagerMaster}
+import spark.scheduler.cluster.Pool
+import spark.scheduler.cluster.SchedulingMode
+import spark.scheduler.cluster.SchedulingMode.SchedulingMode
+
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
@@ -49,6 +53,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
/** Set of TaskSets the DAGScheduler has requested executed. */
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
val taskScheduler = new TaskScheduler() {
+ override def rootPool: Pool = null
+ override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def start() = {}
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {
diff --git a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
index 0f855c38da..bb9e715f95 100644
--- a/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/JobLoggerSuite.scala
@@ -57,7 +57,7 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers
val shuffleMapStage = new Stage(1, parentRdd, Some(shuffleDep), Nil, jobID, None)
val rootStage = new Stage(0, rootRdd, None, List(shuffleMapStage), jobID, None)
- joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4))
+ joblogger.onStageSubmitted(SparkListenerStageSubmitted(rootStage, 4, null))
joblogger.getRddNameTest(parentRdd) should be (parentRdd.getClass.getName)
parentRdd.setName("MyRDD")
joblogger.getRddNameTest(parentRdd) should be ("MyRDD")
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
index 14bb58731b..66fd59e8bb 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
@@ -73,7 +73,7 @@ class LocalSchedulerSuite extends FunSuite with LocalSparkContext {
TaskThreadInfo.threadToStarted(threadIndex) = new CountDownLatch(1)
new Thread {
if (poolName != null) {
- sc.addLocalProperties("spark.scheduler.cluster.fair.pool",poolName)
+ sc.addLocalProperty("spark.scheduler.cluster.fair.pool", poolName)
}
override def run() {
val ans = nums.map(number => {
diff --git a/docs/configuration.md b/docs/configuration.md
index 5c06897cae..99624a44aa 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -35,7 +35,7 @@ for these variables.
* `SPARK_JAVA_OPTS`, to add JVM options. This includes any system properties that you'd like to pass with `-D`.
* `SPARK_CLASSPATH`, to add elements to Spark's classpath.
* `SPARK_LIBRARY_PATH`, to add search directories for native libraries.
-* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
+* `SPARK_MEM`, to set the amount of memory used per node. This should be in the same format as the
JVM's -Xmx option, e.g. `300m` or `1g`. Note that this option will soon be deprecated in favor of
the `spark.executor.memory` system property, so we recommend using that in new code.
@@ -77,7 +77,7 @@ there are at least five properties that you will commonly want to control:
Class to use for serializing objects that will be sent over the network or need to be cached
in serialized form. The default of Java serialization works with any Serializable Java object but is
quite slow, so we recommend <a href="tuning.html">using <code>spark.KryoSerializer</code>
- and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
+ and configuring Kryo serialization</a> when speed is necessary. Can be any subclass of
<a href="api/core/index.html#spark.Serializer"><code>spark.Serializer</code></a>).
</td>
</tr>
@@ -86,7 +86,7 @@ there are at least five properties that you will commonly want to control:
<td>(none)</td>
<td>
If you use Kryo serialization, set this class to register your custom classes with Kryo.
- You need to set it to a class that extends
+ You need to set it to a class that extends
<a href="api/core/index.html#spark.KryoRegistrator"><code>spark.KryoRegistrator</code></a>).
See the <a href="tuning.html#data-serialization">tuning guide</a> for more details.
</td>
@@ -181,6 +181,21 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td>spark.io.compression.codec</td>
+ <td>spark.io.SnappyCompressionCodec</td>
+ <td>
+ The compression codec class to use for various compressions. By default, Spark provides two
+ codecs: <code>spark.io.LZFCompressionCodec</code> and <code>spark.io.SnappyCompressionCodec</code>.
+ </td>
+</tr>
+<tr>
+ <td>spark.io.compression.snappy.block.size</td>
+ <td>32768</td>
+ <td>
+ Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec is used.
+ </td>
+</tr>
+<tr>
<td>spark.reducer.maxMbInFlight</td>
<td>48</td>
<td>
@@ -295,6 +310,14 @@ Apart from these, the following properties are also available, and may be useful
Duration (milliseconds) of how long to batch new objects coming from network receivers.
</td>
</tr>
+<tr>
+ <td>spark.task.maxFailures</td>
+ <td>4</td>
+ <td>
+ Number of individual task failures before giving up on the job.
+ Should be greater than or equal to 1. Number of allowed retries = this value - 1.
+ </td>
+</tr>
</table>
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 3986c0c79d..7463844a4e 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -43,7 +43,7 @@ Finally, the following configuration options can be passed to the master and wor
</tr>
<tr>
<td><code>-p PORT</code>, <code>--port PORT</code></td>
- <td>IP address or DNS name to listen on (default: 7077 for master, random for worker)</td>
+ <td>Port for service to listen on (default: 7077 for master, random for worker)</td>
</tr>
<tr>
<td><code>--webui-port PORT</code></td>
diff --git a/examples/src/main/java/spark/examples/JavaPageRank.java b/examples/src/main/java/spark/examples/JavaPageRank.java
new file mode 100644
index 0000000000..9d90ef9174
--- /dev/null
+++ b/examples/src/main/java/spark/examples/JavaPageRank.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.examples;
+
+import scala.Tuple2;
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.FlatMapFunction;
+import spark.api.java.function.Function;
+import spark.api.java.function.PairFlatMapFunction;
+import spark.api.java.function.PairFunction;
+
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * Computes the PageRank of URLs from an input file. Input file should
+ * be in format of:
+ * URL neighbor URL
+ * URL neighbor URL
+ * URL neighbor URL
+ * ...
+ * where URL and their neighbors are separated by space(s).
+ */
+public class JavaPageRank {
+ private static double sum(List<Double> numbers) {
+ double out = 0.0;
+ for (double number : numbers) {
+ out += number;
+ }
+ return out;
+ }
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 3) {
+ System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
+ System.exit(1);
+ }
+
+ JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ // Loads in input file. It should be in format of:
+ // URL neighbor URL
+ // URL neighbor URL
+ // URL neighbor URL
+ // ...
+ JavaRDD<String> lines = ctx.textFile(args[1], 1);
+
+ // Loads all URLs from input file and initialize their neighbors.
+ JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
+ @Override
+ public Tuple2<String, String> call(String s) {
+ String[] parts = s.split("\\s+");
+ return new Tuple2<String, String>(parts[0], parts[1]);
+ }
+ }).distinct().groupByKey().cache();
+
+ // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
+ JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
+ @Override
+ public Double call(List<String> rs) throws Exception {
+ return 1.0;
+ }
+ });
+
+ // Calculates and updates URL ranks continuously using PageRank algorithm.
+ for (int current = 0; current < Integer.parseInt(args[2]); current++) {
+ // Calculates URL contributions to the rank of other URLs.
+ JavaPairRDD<String, Double> contribs = links.join(ranks).values()
+ .flatMap(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+ @Override
+ public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
+ List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
+ for (String n : s._1) {
+ results.add(new Tuple2<String, Double>(n, s._2 / s._1.size()));
+ }
+
+ return results;
+ }
+ });
+
+ // Re-calculates URL ranks based on neighbor contributions.
+ ranks = contribs.groupByKey().mapValues(new Function<List<Double>, Double>() {
+ @Override
+ public Double call(List<Double> cs) throws Exception {
+ return 0.15 + sum(cs) * 0.85;
+ }
+ });
+ }
+
+ // Collects all URL ranks and dump them to console.
+ List<Tuple2<String, Double>> output = ranks.collect();
+ for (Tuple2 tuple : output) {
+ System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
+ }
+
+ System.exit(0);
+ }
+}
diff --git a/make-distribution.sh b/make-distribution.sh
index 4374e0e8c4..0a8941c1f8 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -66,6 +66,7 @@ cp $FWDIR/repl/target/*.jar "$DISTDIR/jars/"
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/conf" "$DISTDIR"
cp "$FWDIR/run" "$FWDIR/spark-shell" "$DISTDIR"
+cp "$FWDIR/spark-executor" "$DISTDIR"
if [ "$1" == "tgz" ]; then
diff --git a/mllib/pom.xml b/mllib/pom.xml
new file mode 100644
index 0000000000..f3928cc73d
--- /dev/null
+++ b/mllib/pom.xml
@@ -0,0 +1,165 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-parent</artifactId>
+ <version>0.8.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-mllib</artifactId>
+ <packaging>jar</packaging>
+ <name>Spark Project ML Library</name>
+ <url>http://spark-project.org/</url>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jblas</groupId>
+ <artifactId>jblas</artifactId>
+ <version>1.2.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalacheck</groupId>
+ <artifactId>scalacheck_${scala.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <outputDirectory>target/scala-${scala.version}/classes</outputDirectory>
+ <testOutputDirectory>target/scala-${scala.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <profiles>
+ <profile>
+ <id>hadoop1</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop1</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop1</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>hadoop2-yarn</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.spark-project</groupId>
+ <artifactId>spark-core</artifactId>
+ <version>${project.version}</version>
+ <classifier>hadoop2-yarn</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <classifier>hadoop2-yarn</classifier>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+</project>
diff --git a/mllib/src/main/scala/spark/mllib/classification/Classification.scala b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
index d6154b66ae..d6154b66ae 100644
--- a/mllib/src/main/scala/spark/mllib/classification/Classification.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/ClassificationModel.scala
diff --git a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
index bc1c327729..203aa8fdd4 100644
--- a/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/LogisticRegression.scala
@@ -135,8 +135,8 @@ class LogisticRegressionLocalRandomSGD private (var stepSize: Double, var miniBa
object LogisticRegressionLocalRandomSGD {
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. Each iteration uses
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient. The weights used in
* gradient descent are initialized using the initial weights provided.
*
@@ -151,17 +151,17 @@ object LogisticRegressionLocalRandomSGD {
input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double,
-
miniBatchFraction: Double,
initialWeights: Array[Double])
: LogisticRegressionModel =
{
- new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(input, initialWeights)
+ new LogisticRegressionLocalRandomSGD(stepSize, miniBatchFraction, numIterations).train(
+ input, initialWeights)
}
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. Each iteration uses
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. Each iteration uses
* `miniBatchFraction` fraction of the data to calculate the gradient.
*
* @param input RDD of (label, array of features) pairs.
@@ -174,7 +174,6 @@ object LogisticRegressionLocalRandomSGD {
input: RDD[(Int, Array[Double])],
numIterations: Int,
stepSize: Double,
-
miniBatchFraction: Double)
: LogisticRegressionModel =
{
@@ -182,9 +181,9 @@ object LogisticRegressionLocalRandomSGD {
}
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. We use the entire data set to update
- * the gradient in each iteration.
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using the specified step size. We use the entire data
+ * set to update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
@@ -195,17 +194,16 @@ object LogisticRegressionLocalRandomSGD {
def train(
input: RDD[(Int, Array[Double])],
numIterations: Int,
- stepSize: Double
- )
+ stepSize: Double)
: LogisticRegressionModel =
{
train(input, numIterations, stepSize, 1.0)
}
/**
- * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update
- * the gradient in each iteration.
+ * Train a logistic regression model given an RDD of (label, features) pairs. We run a fixed
+ * number of iterations of gradient descent using a step size of 1.0. We use the entire data set
+ * to update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
@@ -221,12 +219,14 @@ object LogisticRegressionLocalRandomSGD {
def main(args: Array[String]) {
if (args.length != 5) {
- println("Usage: LogisticRegression <master> <input_dir> <step_size> <regularization_parameter> <niters>")
+ println("Usage: LogisticRegression <master> <input_dir> <step_size> " +
+ "<regularization_parameter> <niters>")
System.exit(1)
}
val sc = new SparkContext(args(0), "LogisticRegression")
val data = MLUtils.loadLabeledData(sc, args(1)).map(yx => (yx._1.toInt, yx._2))
- val model = LogisticRegressionLocalRandomSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
+ val model = LogisticRegressionLocalRandomSGD.train(
+ data, args(4).toInt, args(2).toDouble, args(3).toDouble)
sc.stop()
}
diff --git a/mllib/src/main/scala/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
index 15b689e7e0..3a6a12814a 100644
--- a/mllib/src/main/scala/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/spark/mllib/classification/SVM.scala
@@ -53,8 +53,8 @@ class SVMModel(
-class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double,
- var numIters: Int)
+class SVMLocalRandomSGD private (var stepSize: Double, var regParam: Double,
+ var miniBatchFraction: Double, var numIters: Int)
extends Logging {
/**
@@ -163,7 +163,8 @@ object SVMLocalRandomSGD {
initialWeights: Array[Double])
: SVMModel =
{
- new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights)
+ new SVMLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
+ input, initialWeights)
}
/**
@@ -190,8 +191,8 @@ object SVMLocalRandomSGD {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. We use the entire data set to update
- * the gradient in each iteration.
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
@@ -211,8 +212,8 @@ object SVMLocalRandomSGD {
/**
* Train a SVM model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update
- * the gradient in each iteration.
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
diff --git a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
index d875d6de50..b402c71ed2 100644
--- a/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/spark/mllib/clustering/KMeans.scala
@@ -315,14 +315,15 @@ object KMeans {
}
def main(args: Array[String]) {
- if (args.length != 4) {
- println("Usage: KMeans <master> <input_file> <k> <max_iterations>")
+ if (args.length < 4) {
+ println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
System.exit(1)
}
val (master, inputFile, k, iters) = (args(0), args(1), args(2).toInt, args(3).toInt)
+ val runs = if (args.length >= 5) args(4).toInt else 1
val sc = new SparkContext(master, "KMeans")
- val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble))
- val model = KMeans.train(data, k, iters)
+ val data = sc.textFile(inputFile).map(line => line.split(' ').map(_.toDouble)).cache()
+ val model = KMeans.train(data, k, iters, runs)
val cost = model.computeCost(data)
println("Cluster centers:")
for (c <- model.clusterCenters) {
diff --git a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
index d4b83a1456..19cda26446 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/GradientDescent.scala
@@ -61,7 +61,7 @@ object GradientDescent {
// Initialize weights as a column vector
var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
- var reg_val = 0.0
+ var regVal = 0.0
for (i <- 1 to numIters) {
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42+i).map {
@@ -71,15 +71,14 @@ object GradientDescent {
(grad, loss)
}.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
- stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
+ /**
+ * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
+ * and regVal is the regularization value computed in the previous iteration as well.
+ */
+ stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
val update = updater.compute(weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
weights = update._1
- reg_val = update._2
- stochasticLossHistory.append(lossSum / miniBatchSize + reg_val)
- /*
- * NOTE(Xinghao): The loss here is sum of lossSum computed using the weights before applying updater,
- * and reg_val using weights after applying updater
- */
+ regVal = update._2
}
(weights.toArray, stochasticLossHistory.toArray)
diff --git a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
index 188fe7d972..3ebc1409b6 100644
--- a/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/spark/mllib/optimization/Updater.scala
@@ -23,36 +23,38 @@ import org.jblas.DoubleMatrix
abstract class Updater extends Serializable {
/**
* Compute an updated value for weights given the gradient, stepSize and iteration number.
+ * Also returns the regularization value computed using the *updated* weights.
*
- * @param weightsOlds - Column matrix of size nx1 where n is the number of features.
+ * @param weightsOld - Column matrix of size nx1 where n is the number of features.
* @param gradient - Column matrix of size nx1 where n is the number of features.
* @param stepSize - step size across iterations
* @param iter - Iteration number
* @param regParam - Regularization parameter
*
* @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
- * and the second element is the regularization value.
+ * and the second element is the regularization value computed using updated weights.
*/
- def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int, regParam: Double):
- (DoubleMatrix, Double)
+ def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int,
+ regParam: Double): (DoubleMatrix, Double)
}
class SimpleUpdater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
- val normGradient = gradient.mul(stepSize / math.sqrt(iter))
+ val thisIterStepSize = stepSize / math.sqrt(iter)
+ val normGradient = gradient.mul(thisIterStepSize)
(weightsOld.sub(normGradient), 0)
}
}
/**
-* L1 regularization -- corresponding proximal operator is the soft-thresholding function
-* That is, each weight component is shrunk towards 0 by shrinkageVal
-* If w > shrinkageVal, set weight component to w-shrinkageVal.
-* If w < -shrinkageVal, set weight component to w+shrinkageVal.
-* If -shrinkageVal < w < shrinkageVal, set weight component to 0.
-* Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
-**/
+ * L1 regularization -- corresponding proximal operator is the soft-thresholding function
+ * That is, each weight component is shrunk towards 0 by shrinkageVal
+ * If w > shrinkageVal, set weight component to w-shrinkageVal.
+ * If w < -shrinkageVal, set weight component to w+shrinkageVal.
+ * If -shrinkageVal < w < shrinkageVal, set weight component to 0.
+ * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
+ */
class L1Updater extends Updater {
override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
@@ -62,10 +64,10 @@ class L1Updater extends Updater {
val newWeights = weightsOld.sub(normGradient)
// Soft thresholding
val shrinkageVal = regParam * thisIterStepSize
- (0 until newWeights.length).foreach(i => {
+ (0 until newWeights.length).foreach { i =>
val wi = newWeights.get(i)
newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
- })
+ }
(newWeights, newWeights.norm1 * regParam)
}
}
@@ -76,7 +78,7 @@ class SquaredL2Updater extends Updater {
val thisIterStepSize = stepSize / math.sqrt(iter)
val normGradient = gradient.mul(thisIterStepSize)
val newWeights = weightsOld.sub(normGradient).div(2.0 * thisIterStepSize * regParam + 1.0)
- (newWeights, pow(newWeights.norm2,2.0) * regParam)
+ (newWeights, pow(newWeights.norm2, 2.0) * regParam)
}
}
diff --git a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
index 7da96397a6..6ecf0151a1 100644
--- a/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/spark/mllib/recommendation/ALS.scala
@@ -35,8 +35,7 @@ import org.jblas.{DoubleMatrix, SimpleBlas, Solve}
* of the elements within this block, and the list of destination blocks that each user or
* product will need to send its feature vector to.
*/
-private[recommendation] case class OutLinkBlock(
- elementIds: Array[Int], shouldSend: Array[BitSet])
+private[recommendation] case class OutLinkBlock(elementIds: Array[Int], shouldSend: Array[BitSet])
/**
@@ -105,7 +104,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
}
/**
- * Run ALS with the configured parmeters on an input RDD of (user, product, rating) triples.
+ * Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
*/
def train(ratings: RDD[(Int, Int, Double)]): MatrixFactorizationModel = {
@@ -419,6 +418,7 @@ object ALS {
System.setProperty("spark.serializer", "spark.KryoSerializer")
System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
System.setProperty("spark.kryo.referenceTracking", "false")
+ System.setProperty("spark.kryoserializer.buffer.mb", "8")
System.setProperty("spark.locality.wait", "10000")
val sc = new SparkContext(master, "ALS")
val ratings = sc.textFile(ratingsFile).map { line =>
diff --git a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
index 1952658bb2..e8b1ed8a48 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/Lasso.scala
@@ -53,8 +53,8 @@ class LassoModel(
}
-class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double, var miniBatchFraction: Double,
- var numIters: Int)
+class LassoLocalRandomSGD private (var stepSize: Double, var regParam: Double,
+ var miniBatchFraction: Double, var numIters: Int)
extends Logging {
/**
@@ -163,7 +163,8 @@ object LassoLocalRandomSGD {
initialWeights: Array[Double])
: LassoModel =
{
- new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(input, initialWeights)
+ new LassoLocalRandomSGD(stepSize, regParam, miniBatchFraction, numIterations).train(
+ input, initialWeights)
}
/**
@@ -190,8 +191,8 @@ object LassoLocalRandomSGD {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using the specified step size. We use the entire data set to update
- * the gradient in each iteration.
+ * of iterations of gradient descent using the specified step size. We use the entire data set to
+ * update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param stepSize Step size to be used for each iteration of Gradient Descent.
@@ -211,8 +212,8 @@ object LassoLocalRandomSGD {
/**
* Train a Lasso model given an RDD of (label, features) pairs. We run a fixed number
- * of iterations of gradient descent using a step size of 1.0. We use the entire data set to update
- * the gradient in each iteration.
+ * of iterations of gradient descent using a step size of 1.0. We use the entire data set to
+ * update the gradient in each iteration.
*
* @param input RDD of (label, array of features) pairs.
* @param numIterations Number of iterations of gradient descent to run.
diff --git a/mllib/src/main/scala/spark/mllib/regression/Regression.scala b/mllib/src/main/scala/spark/mllib/regression/RegressionModel.scala
index b845ba1a89..b845ba1a89 100644
--- a/mllib/src/main/scala/spark/mllib/regression/Regression.scala
+++ b/mllib/src/main/scala/spark/mllib/regression/RegressionModel.scala
diff --git a/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
new file mode 100644
index 0000000000..8f95cf7479
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/KMeansDataGenerator.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.util
+
+import scala.util.Random
+
+import spark.{RDD, SparkContext}
+
+object KMeansDataGenerator {
+
+ /**
+ * Generate an RDD containing test data for KMeans. This function chooses k cluster centers
+ * from a d-dimensional Gaussian distribution scaled by factor r, then creates a Gaussian
+ * cluster with scale 1 around each center.
+ *
+ * @param sc SparkContext to use for creating the RDD
+ * @param numPoints Number of points that will be contained in the RDD
+ * @param k Number of clusters
+ * @param d Number of dimensions
+ * @parak r Scaling factor for the distribution of the initial centers
+ * @param numPartitions Number of partitions of the generated RDD; default 2
+ */
+ def generateKMeansRDD(
+ sc: SparkContext,
+ numPoints: Int,
+ k: Int,
+ d: Int,
+ r: Double,
+ numPartitions: Int = 2)
+ : RDD[Array[Double]] =
+ {
+ // First, generate some centers
+ val rand = new Random(42)
+ val centers = Array.fill(k)(Array.fill(d)(rand.nextGaussian() * r))
+ // Then generate points around each center
+ sc.parallelize(0 until numPoints, numPartitions).map { idx =>
+ val center = centers(idx % k)
+ val rand2 = new Random(42 + idx)
+ Array.tabulate(d)(i => center(i) + rand2.nextGaussian())
+ }
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 6) {
+ println("Usage: KMeansGenerator " +
+ "<master> <output_dir> <num_points> <k> <d> <r> [<num_partitions>]")
+ System.exit(1)
+ }
+
+ val sparkMaster = args(0)
+ val outputPath = args(1)
+ val numPoints = args(2).toInt
+ val k = args(3).toInt
+ val d = args(4).toInt
+ val r = args(5).toDouble
+ val parts = if (args.length >= 7) args(6).toInt else 2
+
+ val sc = new SparkContext(sparkMaster, "KMeansDataGenerator")
+ val data = generateKMeansRDD(sc, numPoints, k, d, r, parts)
+ data.map(_.mkString(" ")).saveAsTextFile(outputPath)
+
+ System.exit(0)
+ }
+}
+
diff --git a/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala
new file mode 100644
index 0000000000..88992cde0c
--- /dev/null
+++ b/mllib/src/main/scala/spark/mllib/util/MFDataGenerator.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.mllib.recommendation
+
+import scala.util.Random
+
+import org.jblas.DoubleMatrix
+
+import spark.{RDD, SparkContext}
+import spark.mllib.util.MLUtils
+
+/**
+* Generate RDD(s) containing data for Matrix Factorization.
+*
+* This method samples training entries according to the oversampling factor
+* 'trainSampFact', which is a multiplicative factor of the number of
+* degrees of freedom of the matrix: rank*(m+n-rank).
+*
+* It optionally samples entries for a testing matrix using
+* 'testSampFact', the percentage of the number of training entries
+* to use for testing.
+*
+* This method takes the following inputs:
+* sparkMaster (String) The master URL.
+* outputPath (String) Directory to save output.
+* m (Int) Number of rows in data matrix.
+* n (Int) Number of columns in data matrix.
+* rank (Int) Underlying rank of data matrix.
+* trainSampFact (Double) Oversampling factor.
+* noise (Boolean) Whether to add gaussian noise to training data.
+* sigma (Double) Standard deviation of added gaussian noise.
+* test (Boolean) Whether to create testing RDD.
+* testSampFact (Double) Percentage of training data to use as test data.
+*/
+
+object MFDataGenerator{
+
+ def main(args: Array[String]) {
+ if (args.length < 2) {
+ println("Usage: MFDataGenerator " +
+ "<master> <outputDir> [m] [n] [rank] [trainSampFact] [noise] [sigma] [test] [testSampFact]")
+ System.exit(1)
+ }
+
+ val sparkMaster: String = args(0)
+ val outputPath: String = args(1)
+ val m: Int = if (args.length > 2) args(2).toInt else 100
+ val n: Int = if (args.length > 3) args(3).toInt else 100
+ val rank: Int = if (args.length > 4) args(4).toInt else 10
+ val trainSampFact: Double = if (args.length > 5) args(5).toDouble else 1.0
+ val noise: Boolean = if (args.length > 6) args(6).toBoolean else false
+ val sigma: Double = if (args.length > 7) args(7).toDouble else 0.1
+ val test: Boolean = if (args.length > 8) args(8).toBoolean else false
+ val testSampFact: Double = if (args.length > 9) args(9).toDouble else 0.1
+
+ val sc = new SparkContext(sparkMaster, "MFDataGenerator")
+
+ val A = DoubleMatrix.randn(m, rank)
+ val B = DoubleMatrix.randn(rank, n)
+ val z = 1 / (scala.math.sqrt(scala.math.sqrt(rank)))
+ A.mmuli(z)
+ B.mmuli(z)
+ val fullData = A.mmul(B)
+
+ val df = rank * (m + n - rank)
+ val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
+ scala.math.round(.99 * m * n)).toInt
+ val rand = new Random()
+ val mn = m * n
+ val shuffled = rand.shuffle(1 to mn toIterable)
+
+ val omega = shuffled.slice(0, sampSize)
+ val ordered = omega.sortWith(_ < _).toArray
+ val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
+ .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+
+ // optionally add gaussian noise
+ if (noise) {
+ trainData.map(x => (x._1, x._2, x._3 + rand.nextGaussian * sigma))
+ }
+
+ trainData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
+
+ // optionally generate testing data
+ if (test) {
+ val testSampSize = scala.math
+ .min(scala.math.round(sampSize * testSampFact),scala.math.round(mn - sampSize)).toInt
+ val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
+ val testOrdered = testOmega.sortWith(_ < _).toArray
+ val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
+ .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+ testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
+ }
+
+ sc.stop()
+
+ }
+} \ No newline at end of file
diff --git a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
index b5e564df6d..25d9673004 100644
--- a/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/spark/mllib/util/MLUtils.scala
@@ -38,9 +38,9 @@ object MLUtils {
*/
def loadLabeledData(sc: SparkContext, dir: String): RDD[(Double, Array[Double])] = {
sc.textFile(dir).map { line =>
- val parts = line.split(",")
+ val parts = line.split(',')
val label = parts(0).toDouble
- val features = parts(1).trim().split(" ").map(_.toDouble)
+ val features = parts(1).trim().split(' ').map(_.toDouble)
(label, features)
}
}
diff --git a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
index d3fe58a382..8664263935 100644
--- a/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -21,11 +21,12 @@ import scala.util.Random
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
import spark.SparkContext
-class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
+class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll with ShouldMatchers {
val sc = new SparkContext("local", "test")
override def afterAll() {
@@ -64,8 +65,8 @@ class LogisticRegressionSuite extends FunSuite with BeforeAndAfterAll {
val numOffPredictions = predictions.zip(input).filter { case (prediction, (expected, _)) =>
(prediction != expected)
}.size
- // At least 80% of the predictions should be on.
- assert(numOffPredictions < input.length / 5)
+ // At least 83% of the predictions should be on.
+ ((input.length - numOffPredictions).toDouble / input.length) should be > 0.83
}
// Test if we can correctly learn A, B where Y = logistic(A + B*X)
diff --git a/pom.xml b/pom.xml
index 44729bd422..f8fbc12aa5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@
<module>core</module>
<module>bagel</module>
<module>examples</module>
+ <module>mllib</module>
<module>tools</module>
<module>streaming</module>
<module>repl</module>
@@ -183,6 +184,11 @@
<version>0.8.4</version>
</dependency>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.0.5</version>
+ </dependency>
+ <dependency>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
<version>4.0</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 9920e00a67..c822f49e78 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -44,7 +44,7 @@ object SparkBuild extends Build {
lazy val core = Project("core", file("core"), settings = coreSettings)
- lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core)
+ lazy val repl = Project("repl", file("repl"), settings = replSettings) dependsOn (core) dependsOn(bagel) dependsOn(mllib)
lazy val examples = Project("examples", file("examples"), settings = examplesSettings) dependsOn (core) dependsOn (streaming)
@@ -151,6 +151,7 @@ object SparkBuild extends Build {
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
val excludeAsm = ExclusionRule(organization = "asm")
+ val excludeSnappy = ExclusionRule(organization = "org.xerial.snappy")
def coreSettings = sharedSettings ++ Seq(
name := "spark-core",
@@ -168,6 +169,7 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"commons-daemon" % "commons-daemon" % "1.0.10",
"com.ning" % "compress-lzf" % "0.8.4",
+ "org.xerial.snappy" % "snappy-java" % "1.0.5",
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"com.typesafe.akka" % "akka-actor" % "2.0.5" excludeAll(excludeNetty),
@@ -235,6 +237,7 @@ object SparkBuild extends Build {
exclude("jline","jline")
exclude("log4j","log4j")
exclude("org.apache.cassandra.deps", "avro")
+ excludeAll(excludeSnappy)
)
)
@@ -242,7 +245,9 @@ object SparkBuild extends Build {
name := "spark-tools"
)
- def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
+ def bagelSettings = sharedSettings ++ Seq(
+ name := "spark-bagel"
+ )
def mllibSettings = sharedSettings ++ Seq(
name := "spark-mllib",
@@ -257,7 +262,7 @@ object SparkBuild extends Build {
"Akka Repository" at "http://repo.akka.io/releases/"
),
libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty),
+ "org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8734cacb0b..51c2cb9806 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -756,9 +756,8 @@ class PipelinedRDD(RDD):
self.ctx._gateway._gateway_client)
self.ctx._pickled_broadcast_vars.clear()
class_manifest = self._prev_jrdd.classManifest()
- env = copy.copy(self.ctx.environment)
- env['PYTHONPATH'] = os.environ.get("PYTHONPATH", "")
- env = MapConverter().convert(env, self.ctx._gateway._gateway_client)
+ env = MapConverter().convert(self.ctx.environment,
+ self.ctx._gateway._gateway_client)
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator, class_manifest)
diff --git a/repl/src/main/scala/spark/repl/SparkILoop.scala b/repl/src/main/scala/spark/repl/SparkILoop.scala
index 59f9d05683..0cecbd71ad 100644
--- a/repl/src/main/scala/spark/repl/SparkILoop.scala
+++ b/repl/src/main/scala/spark/repl/SparkILoop.scala
@@ -831,6 +831,10 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master:
var sparkContext: SparkContext = null
def createSparkContext(): SparkContext = {
+ val uri = System.getenv("SPARK_EXECUTOR_URI")
+ if (uri != null) {
+ System.setProperty("spark.executor.uri", uri)
+ }
val master = this.master match {
case Some(m) => m
case None => {
diff --git a/run b/run
index 4cffda4708..0a440627a1 100755
--- a/run
+++ b/run
@@ -72,7 +72,10 @@ esac
# hard to kill the child with stuff like Process.destroy(). However, for
# the Spark shell, the wrapper is necessary to properly reset the terminal
# when we exit, so we allow it to set a variable to launch with scala.
-if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
+# We still fall back on java for the shell if this is a "release" created
+# from make-distribution.sh since it's possible scala is not installed
+# but we have everything we need to run the shell.
+if [[ "$SPARK_LAUNCH_WITH_SCALA" == "1" && ! -f "$FWDIR/RELEASE" ]]; then
if [ "$SCALA_HOME" ]; then
RUNNER="${SCALA_HOME}/bin/scala"
else
diff --git a/streaming/src/main/scala/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
index 1e4c1e3742..070d930b5e 100644
--- a/streaming/src/main/scala/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/spark/streaming/Checkpoint.scala
@@ -17,16 +17,17 @@
package spark.streaming
-import spark.{Logging, Utils}
-
-import org.apache.hadoop.fs.{FileUtil, Path}
-import org.apache.hadoop.conf.Configuration
-
import java.io._
-import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import java.util.concurrent.Executors
import java.util.concurrent.RejectedExecutionException
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+import spark.Logging
+import spark.io.CompressionCodec
+
+
private[streaming]
class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
extends Logging with Serializable {
@@ -49,6 +50,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
}
}
+
/**
* Convenience class to speed up the writing of graph checkpoint to file
*/
@@ -66,6 +68,8 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
val maxAttempts = 3
val executor = Executors.newFixedThreadPool(1)
+ private val compressionCodec = CompressionCodec.createCodec()
+
// Removed code which validates whether there is only one CheckpointWriter per path 'file' since
// I did not notice any errors - reintroduce it ?
@@ -103,7 +107,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
def write(checkpoint: Checkpoint) {
val bos = new ByteArrayOutputStream()
- val zos = new LZFOutputStream(bos)
+ val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint)
oos.close()
@@ -137,6 +141,8 @@ object CheckpointReader extends Logging {
val fs = new Path(path).getFileSystem(new Configuration())
val attempts = Seq(new Path(path, "graph"), new Path(path, "graph.bk"), new Path(path), new Path(path + ".bk"))
+ val compressionCodec = CompressionCodec.createCodec()
+
attempts.foreach(file => {
if (fs.exists(file)) {
logInfo("Attempting to load checkpoint from file '" + file + "'")
@@ -147,7 +153,7 @@ object CheckpointReader extends Logging {
// of ObjectInputStream is used to explicitly use the current thread's default class
// loader to find and load classes. This is a well know Java issue and has popped up
// in other places (e.g., http://jira.codehaus.org/browse/GROOVY-1627)
- val zis = new LZFInputStream(fis)
+ val zis = compressionCodec.compressedInputStream(fis)
val ois = new ObjectInputStreamWithLoader(zis, Thread.currentThread().getContextClassLoader)
val cp = ois.readObject.asInstanceOf[Checkpoint]
ois.close()
@@ -170,7 +176,9 @@ object CheckpointReader extends Logging {
}
private[streaming]
-class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader) extends ObjectInputStream(inputStream_) {
+class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoader)
+ extends ObjectInputStream(inputStream_) {
+
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
return loader.loadClass(desc.getName())