aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-10-10 09:42:23 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-10-10 09:42:23 +0530
commit26860639c5fee7fc23db1e686f8eb202921e4314 (patch)
treee05e555fcd713a7eb15680ae078994d70f396135
parent7d50f9f87baeb1f4b8d77d669d25649b97dd1d57 (diff)
parent7be75682b931dd52014f3cfdc6887e54583ad0af (diff)
downloadspark-26860639c5fee7fc23db1e686f8eb202921e4314.tar.gz
spark-26860639c5fee7fc23db1e686f8eb202921e4314.tar.bz2
spark-26860639c5fee7fc23db1e686f8eb202921e4314.zip
Merge branch 'scala-2.10' of github.com:ScrapCodes/spark into scala-2.10
Conflicts: core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala project/SparkBuild.scala
-rw-r--r--README.md1
-rw-r--r--assembly/pom.xml20
-rw-r--r--bagel/pom.xml6
-rw-r--r--core/pom.xml13
-rw-r--r--core/src/main/scala/org/apache/spark/CacheManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/TaskEndReason.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala140
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Pool.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala)10
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala)8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala)5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala169
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala124
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala)20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala)22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala13
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala1
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala39
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala15
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala58
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala113
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/util/UtilsSuite.scala11
-rw-r--r--docs/_config.yml4
-rwxr-xr-xdocs/_layouts/global.html4
-rw-r--r--docs/running-on-yarn.md1
-rw-r--r--ec2/README2
-rwxr-xr-xec2/spark_ec2.py110
-rw-r--r--examples/pom.xml28
-rwxr-xr-xmake-distribution.sh2
-rw-r--r--mllib/pom.xml10
-rw-r--r--pom.xml6
-rw-r--r--project/SparkBuild.scala22
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/serializers.py4
-rw-r--r--python/pyspark/shell.py2
-rw-r--r--repl-bin/pom.xml12
-rw-r--r--repl/pom.xml18
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala2
-rw-r--r--streaming/pom.xml7
-rw-r--r--tools/pom.xml8
-rw-r--r--yarn/pom.xml8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala7
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala5
83 files changed, 1071 insertions, 403 deletions
diff --git a/README.md b/README.md
index 87d53da8da..7d3f9d4845 100644
--- a/README.md
+++ b/README.md
@@ -108,3 +108,4 @@ project's open source license. Whether or not you state this explicitly, by
submitting any copyrighted material via pull request, email, or other means
you agree to license the material under the project's open source license and
warrant that you have the legal authority to do so.
+
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 47a110ca6c..28b0692dff 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-assembly</artifactId>
+ <artifactId>spark-assembly_${scala-short.version}</artifactId>
<name>Spark Project Assembly</name>
<url>http://spark.incubator.apache.org/</url>
@@ -41,27 +41,27 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel</artifactId>
+ <artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib</artifactId>
+ <artifactId>spark-mllib_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl</artifactId>
+ <artifactId>spark-repl_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming</artifactId>
+ <artifactId>spark-streaming_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -104,13 +104,13 @@
</goals>
<configuration>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.hadoop.fs.FileSystem</resource>
</transformer>
</transformers>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
@@ -128,7 +128,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn</artifactId>
+ <artifactId>spark-yarn_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
diff --git a/bagel/pom.xml b/bagel/pom.xml
index feaed6d2b0..c8b9c4f4cd 100644
--- a/bagel/pom.xml
+++ b/bagel/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel</artifactId>
+ <artifactId>spark-bagel_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>
<url>http://spark.incubator.apache.org/</url>
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/core/pom.xml b/core/pom.xml
index 8d9f0e386f..595240b5e5 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Core</name>
<url>http://spark.incubator.apache.org/</url>
@@ -39,7 +39,6 @@
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
- <version>0.7.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
@@ -199,14 +198,14 @@
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
- <property name="spark.classpath" refid="maven.test.classpath"/>
- <property environment="env"/>
+ <property name="spark.classpath" refid="maven.test.classpath" />
+ <property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
- <isset property="env.SCALA_HOME"/>
- <isset property="env.SCALA_LIBRARY_PATH"/>
+ <isset property="env.SCALA_HOME" />
+ <isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 68b99ca125..4cf7eb96da 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -26,28 +26,29 @@ import org.apache.spark.rdd.RDD
sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
- private val loading = new HashSet[String]
+
+ /** Keys of RDD splits that are being computed/loaded. */
+ private val loading = new HashSet[String]()
/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
: Iterator[T] = {
val key = "rdd_%d_%d".format(rdd.id, split.index)
- logInfo("Cache key is " + key)
+ logDebug("Looking for partition " + key)
blockManager.get(key) match {
- case Some(cachedValues) =>
- // Partition is in cache, so just return its values
- logInfo("Found partition in cache!")
- return cachedValues.asInstanceOf[Iterator[T]]
+ case Some(values) =>
+ // Partition is already materialized, so just return its values
+ return values.asInstanceOf[Iterator[T]]
case None =>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized {
if (loading.contains(key)) {
- logInfo("Loading contains " + key + ", waiting...")
+ logInfo("Another thread is loading %s, waiting for it to finish...".format(key))
while (loading.contains(key)) {
try {loading.wait()} catch {case _ : Throwable =>}
}
- logInfo("Loading no longer contains " + key + ", so returning cached result")
+ logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
@@ -57,7 +58,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(values) =>
return values.asInstanceOf[Iterator[T]]
case None =>
- logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
+ logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))
loading.add(key)
}
} else {
@@ -66,7 +67,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
}
try {
// If we got here, we have to load the split
- logInfo("Computing partition " + split)
+ logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ef97fa85fa..efcc92e8e7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -53,14 +53,15 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor
import org.apache.mesos.MesosNativeLibrary
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
- ClusterScheduler, Schedulable, SchedulingMode}
+ ClusterScheduler}
import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
@@ -85,9 +86,11 @@ class SparkContext(
val sparkHome: String = null,
val jars: Seq[String] = Nil,
val environment: Map[String, String] = Map(),
- // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
- // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
+ // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc)
+ // too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
+ // of data-local splits on host
+ val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+ scala.collection.immutable.Map())
extends Logging {
// Ensure logging is initialized before we spawn any threads
@@ -147,7 +150,7 @@ class SparkContext(
}
// Create and start the scheduler
- private var taskScheduler: TaskScheduler = {
+ private[spark] var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
@@ -240,7 +243,8 @@ class SparkContext(
val env = SparkEnv.get
val conf = env.hadoop.newConfiguration()
// Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
+ if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
+ System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
@@ -339,6 +343,8 @@ class SparkContext(
valueClass: Class[V],
minSplits: Int = defaultMinSplits
): RDD[(K, V)] = {
+ // Add necessary security credentials to the JobConf before broadcasting it.
+ SparkEnv.get.hadoop.addCredentials(conf)
new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
}
@@ -349,10 +355,27 @@ class SparkContext(
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int = defaultMinSplits
- ) : RDD[(K, V)] = {
- val conf = new JobConf(hadoopConfiguration)
- FileInputFormat.setInputPaths(conf, path)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
+ ): RDD[(K, V)] = {
+ // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
+ val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
+ hadoopFile(path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
+ }
+
+ /**
+ * Get an RDD for a Hadoop file with an arbitray InputFormat. Accept a Hadoop Configuration
+ * that has already been broadcast, assuming that it's safe to use it to construct a
+ * HadoopFileRDD (i.e., except for file 'path', all other configuration properties can be resued).
+ */
+ def hadoopFile[K, V](
+ path: String,
+ confBroadcast: Broadcast[SerializableWritable[Configuration]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int
+ ): RDD[(K, V)] = {
+ new HadoopFileRDD(
+ this, path, confBroadcast, inputFormatClass, keyClass, valueClass, minSplits)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 03bf268863..8466c2a004 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -46,6 +46,10 @@ private[spark] case class ExceptionFailure(
metrics: Option[TaskMetrics])
extends TaskEndReason
-private[spark] case class OtherFailure(message: String) extends TaskEndReason
+/**
+ * The task finished successfully, but the result was lost from the executor's block manager before
+ * it was fetched.
+ */
+private[spark] case object TaskResultLost extends TaskEndReason
-private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
+private[spark] case class OtherFailure(message: String) extends TaskEndReason
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index feb2cab578..d7b45d4caa 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -69,6 +69,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType())
/**
+ * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+ * of the original partition.
+ */
+ def mapPartitionsWithIndex[R: ClassManifest](
+ f: JFunction2[Int, java.util.Iterator[T], java.util.Iterator[R]],
+ preservesPartitioning: Boolean = false): JavaRDD[R] =
+ new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))),
+ preservesPartitioning))
+
+ /**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[R](f: DoubleFunction[T]): JavaDoubleRDD =
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
index b090c6edf3..2be4e323be 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -17,12 +17,13 @@
package org.apache.spark.api.python
-import org.apache.spark.Partitioner
import java.util.Arrays
+
+import org.apache.spark.Partitioner
import org.apache.spark.util.Utils
/**
- * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
+ * A [[org.apache.spark.Partitioner]] that performs handling of long-valued keys, for use by the Python API.
*
* Stores the unique id() of the Python-side partitioning function so that it is incorporated into
* equality comparisons. Correctness requires that the id is a unique identifier for the
@@ -30,6 +31,7 @@ import org.apache.spark.util.Utils
* function). This can be ensured by using the Python id() function and maintaining a reference
* to the Python partitioning function so that its id() is not reused.
*/
+
private[spark] class PythonPartitioner(
override val numPartitions: Int,
val pyPartitionFunctionId: Long)
@@ -37,7 +39,9 @@ private[spark] class PythonPartitioner(
override def getPartition(key: Any): Int = key match {
case null => 0
- case key: Array[Byte] => Utils.nonNegativeMod(Arrays.hashCode(key), numPartitions)
+ // we don't trust the Python partition function to return valid partition ID's so
+ // let's do a modulo numPartitions in any case
+ case key: Long => Utils.nonNegativeMod(key.toInt, numPartitions)
case _ => Utils.nonNegativeMod(key.hashCode(), numPartitions)
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index cb2db77f39..4d887cf195 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -187,14 +187,14 @@ private class PythonException(msg: String) extends Exception(msg)
* This is used by PySpark's shuffle operations.
*/
private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
- RDD[(Array[Byte], Array[Byte])](prev) {
+ RDD[(Long, Array[Byte])](prev) {
override def getPartitions = prev.partitions
override def compute(split: Partition, context: TaskContext) =
prev.iterator(split, context).grouped(2).map {
- case Seq(a, b) => (a, b)
+ case Seq(a, b) => (Utils.deserializeLongValue(a), b)
case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
}
- val asJavaPairRDD : JavaPairRDD[Array[Byte], Array[Byte]] = JavaPairRDD.fromRDD(this)
+ val asJavaPairRDD : JavaPairRDD[Long, Array[Byte]] = JavaPairRDD.fromRDD(this)
}
private[spark] object PythonRDD {
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 87a703427c..04d01c169d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -41,6 +41,7 @@ private[spark] object JsonProtocol {
("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
("name" -> obj.desc.name) ~
+ ("appuiurl" -> obj.appUiUrl) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerSlave) ~
@@ -64,7 +65,7 @@ private[spark] object JsonProtocol {
}
def writeMasterState(obj: MasterStateResponse) = {
- ("url" -> ("spark://" + obj.uri)) ~
+ ("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
("coresused" -> obj.workers.map(_.coresUsed).sum) ~
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 0a5f4c368f..993ba6bd3d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -16,6 +16,9 @@
*/
package org.apache.spark.deploy
+
+import com.google.common.collect.MapMaker
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
@@ -24,11 +27,16 @@ import org.apache.hadoop.mapred.JobConf
* Contains util methods to interact with Hadoop from spark.
*/
class SparkHadoopUtil {
+ // A general, soft-reference map for metadata needed during HadoopRDD split computation
+ // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
+ private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop subsystems
+ // Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
+ // subsystems
def newConfiguration(): Configuration = new Configuration()
- // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
+ // Add any user credentials to the job conf which are necessary for running on a secure Hadoop
+ // cluster
def addCredentials(conf: JobConf) {}
def isYarnMode(): Boolean = { false }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 99a4a95e82..b4153f3533 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -17,7 +17,7 @@
package org.apache.spark.executor
-import java.io.{File}
+import java.io.File
import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent._
@@ -27,11 +27,11 @@ import scala.collection.mutable.HashMap
import org.apache.spark.scheduler._
import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
-
/**
- * The Mesos executor for Spark.
+ * Spark executor used with Mesos and the standalone scheduler.
*/
private[spark] class Executor(
executorId: String,
@@ -167,12 +167,20 @@ private[spark] class Executor(
// we need to serialize the task metrics first. If TaskMetrics had a custom serialized format, we could
// just change the relevants bytes in the byte buffer
val accumUpdates = Accumulators.values
- val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
- val serializedResult = ser.serialize(result)
- logInfo("Serialized size of result for " + taskId + " is " + serializedResult.limit)
- if (serializedResult.limit >= (akkaFrameSize - 1024)) {
- context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(TaskResultTooBigFailure()))
- return
+ val directResult = new DirectTaskResult(value, accumUpdates, task.metrics.getOrElse(null))
+ val serializedDirectResult = ser.serialize(directResult)
+ logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
+ val serializedResult = {
+ if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
+ logInfo("Storing result for " + taskId + " in local BlockManager")
+ val blockId = "taskresult_" + taskId
+ env.blockManager.putBytes(
+ blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
+ ser.serialize(new IndirectTaskResult[Any](blockId))
+ } else {
+ logInfo("Sending result for " + taskId + " directly to driver")
+ serializedDirectResult
+ }
}
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
logInfo("Finished task ID " + taskId)
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cb6734e41..d3b3fffd40 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -19,6 +19,7 @@ package org.apache.spark.rdd
import java.io.EOFException
+import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
@@ -26,10 +27,47 @@ import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv,
+ TaskContext}
+import org.apache.spark.broadcast.Broadcast
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
+/**
+ * An RDD that reads a file (or multiple files) from Hadoop (e.g. files in HDFS, the local file
+ * system, or S3).
+ * This accepts a general, broadcasted Hadoop Configuration because those tend to remain the same
+ * across multiple reads; the 'path' is the only variable that is different across new JobConfs
+ * created from the Configuration.
+ */
+class HadoopFileRDD[K, V](
+ sc: SparkContext,
+ path: String,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int)
+ extends HadoopRDD[K, V](sc, broadcastedConf, inputFormatClass, keyClass, valueClass, minSplits) {
+
+ override def getJobConf(): JobConf = {
+ if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ // getJobConf() has been called previously, so there is already a local cache of the JobConf
+ // needed by this RDD.
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a new JobConf, set the input file/directory paths to read from, and cache the
+ // JobConf (i.e., in a shared hash map in the slave's JVM process that's accessible through
+ // HadoopRDD.putCachedMetadata()), so that we only create one copy across multiple
+ // getJobConf() calls for this RDD in the local process.
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ val newJobConf = new JobConf(broadcastedConf.value.value)
+ FileInputFormat.setInputPaths(newJobConf, path)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
+}
/**
* A Spark split class that wraps around a Hadoop InputSplit.
@@ -45,29 +83,80 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
}
/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
+ * A base class that provides core functionality for reading data partitions stored in Hadoop.
*/
class HadoopRDD[K, V](
sc: SparkContext,
- @transient conf: JobConf,
+ broadcastedConf: Broadcast[SerializableWritable[Configuration]],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minSplits: Int)
extends RDD[(K, V)](sc, Nil) with Logging {
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+ def this(
+ sc: SparkContext,
+ conf: JobConf,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int) = {
+ this(
+ sc,
+ sc.broadcast(new SerializableWritable(conf))
+ .asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
+ inputFormatClass,
+ keyClass,
+ valueClass,
+ minSplits)
+ }
+
+ protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
+
+ protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
+
+ // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads.
+ protected def getJobConf(): JobConf = {
+ val conf: Configuration = broadcastedConf.value.value
+ if (conf.isInstanceOf[JobConf]) {
+ // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
+ return conf.asInstanceOf[JobConf]
+ } else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
+ // getJobConf() has been called previously, so there is already a local cache of the JobConf
+ // needed by this RDD.
+ return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
+ } else {
+ // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
+ // local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
+ // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
+ val newJobConf = new JobConf(broadcastedConf.value.value)
+ HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
+ return newJobConf
+ }
+ }
+
+ protected def getInputFormat(conf: JobConf): InputFormat[K, V] = {
+ if (HadoopRDD.containsCachedMetadata(inputFormatCacheKey)) {
+ return HadoopRDD.getCachedMetadata(inputFormatCacheKey).asInstanceOf[InputFormat[K, V]]
+ }
+ // Once an InputFormat for this RDD is created, cache it so that only one reflection call is
+ // done in each local process.
+ val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[K, V]]
+ if (newInputFormat.isInstanceOf[Configurable]) {
+ newInputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
+ HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
+ return newInputFormat
+ }
override def getPartitions: Array[Partition] = {
- val env = SparkEnv.get
- env.hadoop.addCredentials(conf)
- val inputFormat = createInputFormat(conf)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
- inputFormat.asInstanceOf[Configurable].setConf(conf)
+ inputFormat.asInstanceOf[Configurable].setConf(jobConf)
}
- val inputSplits = inputFormat.getSplits(conf, minSplits)
+ val inputSplits = inputFormat.getSplits(jobConf, minSplits)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
@@ -75,22 +164,14 @@ class HadoopRDD[K, V](
array
}
- def createInputFormat(conf: JobConf): InputFormat[K, V] = {
- ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
- .asInstanceOf[InputFormat[K, V]]
- }
-
override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
var reader: RecordReader[K, V] = null
- val conf = confBroadcast.value.value
- val fmt = createInputFormat(conf)
- if (fmt.isInstanceOf[Configurable]) {
- fmt.asInstanceOf[Configurable].setConf(conf)
- }
- reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+ val jobConf = getJobConf()
+ val inputFormat = getInputFormat(jobConf)
+ reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream.
context.addOnCompleteCallback{ () => closeIfNeeded() }
@@ -127,5 +208,18 @@ class HadoopRDD[K, V](
// Do nothing. Hadoop RDD should not be checkpointed.
}
- def getConf: Configuration = confBroadcast.value.value
+ def getConf: Configuration = getJobConf()
+}
+
+private[spark] object HadoopRDD {
+ /**
+ * The three methods below are helpers for accessing the local map, a property of the SparkEnv of
+ * the local process.
+ */
+ def getCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.get(key)
+
+ def containsCachedMetadata(key: String) = SparkEnv.get.hadoop.hadoopJobMetadata.containsKey(key)
+
+ def putCachedMetadata(key: String, value: Any) =
+ SparkEnv.get.hadoop.hadoopJobMetadata.put(key, value)
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea4eeb7dbf..731ef90c90 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -756,24 +756,42 @@ abstract class RDD[T: ClassTag](
}
/**
- * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
- * it will be slow if a lot of partitions are required. In that case, use collect() to get the
- * whole RDD instead.
+ * Take the first num elements of the RDD. It works by first scanning one partition, and use the
+ * results from that partition to estimate the number of additional partitions needed to satisfy
+ * the limit.
*/
def take(num: Int): Array[T] = {
if (num == 0) {
return new Array[T](0)
}
+
val buf = new ArrayBuffer[T]
- var p = 0
- while (buf.size < num && p < partitions.size) {
+ val totalParts = this.partitions.length
+ var partsScanned = 0
+ while (buf.size < num && partsScanned < totalParts) {
+ // The number of partitions to try in this iteration. It is ok for this number to be
+ // greater than totalParts because we actually cap it at totalParts in runJob.
+ var numPartsToTry = 1
+ if (partsScanned > 0) {
+ // If we didn't find any rows after the first iteration, just try all partitions next.
+ // Otherwise, interpolate the number of partitions we need to try, but overestimate it
+ // by 50%.
+ if (buf.size == 0) {
+ numPartsToTry = totalParts - 1
+ } else {
+ numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt
+ }
+ }
+ numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions
+
val left = num - buf.size
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
- buf ++= res(0)
- if (buf.size == num)
- return buf.toArray
- p += 1
+ val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
+ val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
+
+ res.foreach(buf ++= _.take(num - buf.size))
+ partsScanned += numPartsToTry
}
+
return buf.toArray
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index dca84db597..e79b67579e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -29,7 +29,6 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
-import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.storage.{BlockManager, BlockManagerMaster}
import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
@@ -554,7 +553,7 @@ class DAGScheduler(
SparkEnv.get.closureSerializer.newInstance().serialize(tasks.head)
} catch {
case e: NotSerializableException =>
- abortStage(stage, e.toString)
+ abortStage(stage, "Task not serializable: " + e.toString)
running -= stage
return
}
@@ -706,6 +705,9 @@ class DAGScheduler(
case ExceptionFailure(className, description, stackTrace, metrics) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
+ case TaskResultLost =>
+ // Do nothing here; the TaskScheduler handles these failures and resubmits the task.
+
case other =>
// Unrecognized failure - abort all jobs depending on this stage
abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 0d99670648..10ff1b4376 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler
import java.util.Properties
-import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map
import org.apache.spark._
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index c8b78bf00a..3628b1b078 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -30,7 +30,6 @@ import scala.io.Source
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 35b32600da..9eb8d48501 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import org.apache.spark.Logging
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* An Schedulable entity that represent collection of Pools or TaskSetManagers
@@ -45,7 +45,7 @@ private[spark] class Pool(
var priority = 0
var stageId = 0
var name = poolName
- var parent:Schedulable = null
+ var parent: Pool = null
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
@@ -101,14 +101,14 @@ private[spark] class Pool(
return sortedTaskSetQueue
}
- override def increaseRunningTasks(taskNum: Int) {
+ def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
- override def decreaseRunningTasks(taskNum: Int) {
+ def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index f4726450ec..1c7ea2dccc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -15,9 +15,9 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import scala.collection.mutable.ArrayBuffer
/**
@@ -25,7 +25,7 @@ import scala.collection.mutable.ArrayBuffer
* there are two type of Schedulable entities(Pools and TaskSetManagers)
*/
private[spark] trait Schedulable {
- var parent: Schedulable
+ var parent: Pool
// child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulingMode: SchedulingMode
@@ -36,8 +36,6 @@ private[spark] trait Schedulable {
def stageId: Int
def name: String
- def increaseRunningTasks(taskNum: Int): Unit
- def decreaseRunningTasks(taskNum: Int): Unit
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 114617c51a..4e25086ec9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
index cbeed4731a..3418640b8c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingAlgorithm.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
/**
* An interface for sort algorithm
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
index 16013b3208..3832ee7ff6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulingMode.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingMode.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
/**
* "FAIR" and "FIFO" determines which policy is used
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index c3cf4b8907..62b521ad45 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -18,7 +18,6 @@
package org.apache.spark.scheduler
import java.util.Properties
-import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, SparkContext, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 72cb1c9ce8..b6f11969e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,8 +17,8 @@
package org.apache.spark.scheduler
-import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection._
+
import org.apache.spark.executor.TaskMetrics
case class StageInfo(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 309ac2f6c9..5190d234d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.apache.spark.util.SerializableBuffer
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 9685fb1a67..7c2a422aff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index 8d8d708612..d31a5d5177 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
private[spark] object TaskLocality
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 5c7e5bb977..db3954a9d3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -26,12 +26,17 @@ import java.nio.ByteBuffer
import org.apache.spark.util.Utils
// Task result. Also contains updates to accumulator variables.
-// TODO: Use of distributed cache to return result is a hack to get around
-// what seems to be a bug with messages over 60KB in libprocess; fix it
+private[spark] sealed trait TaskResult[T]
+
+/** A reference to a DirectTaskResult that has been stored in the worker's BlockManager. */
+private[spark]
+case class IndirectTaskResult[T](val blockId: String) extends TaskResult[T] with Serializable
+
+/** A TaskResult that contains the task's return value and accumulator updates. */
private[spark]
-class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
- extends Externalizable
-{
+class DirectTaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+ extends TaskResult[T] with Externalizable {
+
def this() = this(null.asInstanceOf[T], null, null)
override def writeExternal(out: ObjectOutput) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 63be8ba3f5..7c2a9f03d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,10 +17,11 @@
package org.apache.spark.scheduler
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+
/**
* Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
+ * Each TaskScheduler schedulers task for a single SparkContext.
* These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
* and are responsible for sending the tasks to the cluster, running them, retrying if there
* are failures, and mitigating stragglers. They return events to the DAGScheduler through
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
index 83be051c1a..593fa9fb93 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
@@ -17,7 +17,6 @@
package org.apache.spark.scheduler
-import org.apache.spark.scheduler.cluster.TaskInfo
import scala.collection.mutable.Map
import org.apache.spark.TaskEndReason
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 648a3ef922..90f6bcefac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -15,12 +15,11 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.cluster
+package org.apache.spark.scheduler
import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.TaskSet
/**
* Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
@@ -45,7 +44,5 @@ private[spark] trait TaskSetManager extends Schedulable {
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription]
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
-
def error(message: String)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 919acce828..1a844b7e7e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -18,6 +18,9 @@
package org.apache.spark.scheduler.cluster
import java.lang.{Boolean => JBoolean}
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@@ -26,10 +29,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicLong
-import java.util.{TimerTask, Timer}
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -55,7 +55,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
- val activeTaskSets = new HashMap[String, TaskSetManager]
+ // ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
+ // on this class.
+ val activeTaskSets = new HashMap[String, ClusterTaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskIdToExecutorId = new HashMap[Long, String]
@@ -65,7 +67,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
@volatile private var hasLaunchedTask = false
private val starvationTimer = new Timer(true)
- // Incrementing Mesos task IDs
+ // Incrementing task IDs
val nextTaskId = new AtomicLong(0)
// Which executor IDs we have executors on
@@ -96,6 +98,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.scheduler.mode", "FIFO"))
+ // This is a var so that we can reset it for testing purposes.
+ private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
+
override def setListener(listener: TaskSchedulerListener) {
this.listener = listener
}
@@ -234,7 +239,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- var taskSetToUpdate: Option[TaskSetManager] = None
var failedExecutor: Option[String] = None
var taskFailed = false
synchronized {
@@ -249,9 +253,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
taskIdToTaskSetId.get(tid) match {
case Some(taskSetId) =>
- if (activeTaskSets.contains(taskSetId)) {
- taskSetToUpdate = Some(activeTaskSets(taskSetId))
- }
if (TaskState.isFinished(state)) {
taskIdToTaskSetId.remove(tid)
if (taskSetTaskIds.contains(taskSetId)) {
@@ -262,6 +263,15 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (state == TaskState.FAILED) {
taskFailed = true
}
+ activeTaskSets.get(taskSetId).foreach { taskSet =>
+ if (state == TaskState.FINISHED) {
+ taskSet.removeRunningTask(tid)
+ taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
+ } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
+ taskSet.removeRunningTask(tid)
+ taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
+ }
+ }
case None =>
logInfo("Ignoring update from TID " + tid + " because its task set is gone")
}
@@ -269,10 +279,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case e: Exception => logError("Exception in statusUpdate", e)
}
}
- // Update the task set and DAGScheduler without holding a lock on this, since that can deadlock
- if (taskSetToUpdate != None) {
- taskSetToUpdate.get.statusUpdate(tid, state, serializedData)
- }
+ // Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor != None) {
listener.executorLost(failedExecutor.get)
backend.reviveOffers()
@@ -283,6 +290,25 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
+ def handleSuccessfulTask(
+ taskSetManager: ClusterTaskSetManager,
+ tid: Long,
+ taskResult: DirectTaskResult[_]) = synchronized {
+ taskSetManager.handleSuccessfulTask(tid, taskResult)
+ }
+
+ def handleFailedTask(
+ taskSetManager: ClusterTaskSetManager,
+ tid: Long,
+ taskState: TaskState,
+ reason: Option[TaskEndReason]) = synchronized {
+ taskSetManager.handleFailedTask(tid, taskState, reason)
+ if (taskState == TaskState.FINISHED) {
+ // The task finished successfully but the result was lost, so we should revive offers.
+ backend.reviveOffers()
+ }
+ }
+
def error(message: String) {
synchronized {
if (activeTaskSets.size > 0) {
@@ -311,6 +337,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
if (jarServer != null) {
jarServer.stop()
}
+ if (taskResultGetter != null) {
+ taskResultGetter.stop()
+ }
// sleeping for an arbitrary 5 seconds : to ensure that messages are sent out.
// TODO: Do something better !
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index f61fde6957..194ab55102 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -25,15 +25,12 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.math.max
import scala.math.min
+import scala.Some
-import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState}
-import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
+import org.apache.spark.{ExceptionFailure, FetchFailed, Logging, Resubmitted, SparkEnv,
+ SparkException, Success, TaskEndReason, TaskResultLost, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler._
-import scala.Some
-import org.apache.spark.FetchFailed
-import org.apache.spark.ExceptionFailure
-import org.apache.spark.TaskResultTooBigFailure
import org.apache.spark.util.{SystemClock, Clock}
@@ -71,18 +68,20 @@ private[spark] class ClusterTaskSetManager(
val tasks = taskSet.tasks
val numTasks = tasks.length
val copiesRunning = new Array[Int](numTasks)
- val finished = new Array[Boolean](numTasks)
+ val successful = new Array[Boolean](numTasks)
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
- var tasksFinished = 0
+ var tasksSuccessful = 0
var weight = 1
var minShare = 0
- var runningTasks = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
- var parent: Schedulable = null
+ var parent: Pool = null
+
+ var runningTasks = 0
+ private val runningTasksSet = new HashSet[Long]
// Set of pending tasks for each executor. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
@@ -223,7 +222,7 @@ private[spark] class ClusterTaskSetManager(
while (!list.isEmpty) {
val index = list.last
list.trimEnd(1)
- if (copiesRunning(index) == 0 && !finished(index)) {
+ if (copiesRunning(index) == 0 && !successful(index)) {
return Some(index)
}
}
@@ -243,7 +242,7 @@ private[spark] class ClusterTaskSetManager(
private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
- speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
+ speculatableTasks.retain(index => !successful(index)) // Remove finished tasks from set
if (!speculatableTasks.isEmpty) {
// Check for process-local or preference-less tasks; note that tasks can be process-local
@@ -344,7 +343,7 @@ private[spark] class ClusterTaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
- if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
+ if (tasksSuccessful < numTasks && availableCpus >= CPUS_PER_TASK) {
val curTime = clock.getTime()
var allowedLocality = getAllowedLocalityLevel(curTime)
@@ -375,7 +374,7 @@ private[spark] class ClusterTaskSetManager(
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = clock.getTime() - startTime
- increaseRunningTasks(1)
+ addRunningTask(taskId)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
@@ -417,94 +416,61 @@ private[spark] class ClusterTaskSetManager(
index
}
- /** Called by cluster scheduler when one of our tasks changes state */
- override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- SparkEnv.set(env)
- state match {
- case TaskState.FINISHED =>
- taskFinished(tid, state, serializedData)
- case TaskState.LOST =>
- taskLost(tid, state, serializedData)
- case TaskState.FAILED =>
- taskLost(tid, state, serializedData)
- case TaskState.KILLED =>
- taskLost(tid, state, serializedData)
- case _ =>
- }
- }
-
- def taskStarted(task: Task[_], info: TaskInfo) {
+ private def taskStarted(task: Task[_], info: TaskInfo) {
sched.listener.taskStarted(task, info)
}
- def taskFinished(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ /**
+ * Marks the task as successful and notifies the listener that a task has ended.
+ */
+ def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]) = {
val info = taskInfos(tid)
- if (info.failed) {
- // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
- // or even from Mesos itself when acks get delayed.
- return
- }
val index = info.index
info.markSuccessful()
- decreaseRunningTasks(1)
- if (!finished(index)) {
- tasksFinished += 1
+ removeRunningTask(tid)
+ if (!successful(index)) {
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
- tid, info.duration, info.host, tasksFinished, numTasks))
- // Deserialize task result and pass it to the scheduler
- try {
- val result = ser.deserialize[TaskResult[_]](serializedData)
- result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(
- tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
- } catch {
- case cnf: ClassNotFoundException =>
- val loader = Thread.currentThread().getContextClassLoader
- throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
- case ex: Throwable => throw ex
- }
- // Mark finished and stop if we've finished all the tasks
- finished(index) = true
- if (tasksFinished == numTasks) {
+ tid, info.duration, info.host, tasksSuccessful, numTasks))
+ sched.listener.taskEnded(
+ tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+
+ // Mark successful and stop if all the tasks have succeeded.
+ tasksSuccessful += 1
+ successful(index) = true
+ if (tasksSuccessful == numTasks) {
sched.taskSetFinished(this)
}
} else {
- logInfo("Ignoring task-finished event for TID " + tid +
- " because task " + index + " is already finished")
+ logInfo("Ignorning task-finished event for TID " + tid + " because task " +
+ index + " has already completed successfully")
}
}
- def taskLost(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ /**
+ * Marks the task as failed, re-adds it to the list of pending tasks, and notifies the listener.
+ */
+ def handleFailedTask(tid: Long, state: TaskState, reason: Option[TaskEndReason]) {
val info = taskInfos(tid)
if (info.failed) {
- // We might get two task-lost messages for the same task in coarse-grained Mesos mode,
- // or even from Mesos itself when acks get delayed.
return
}
+ removeRunningTask(tid)
val index = info.index
info.markFailed()
- decreaseRunningTasks(1)
- if (!finished(index)) {
+ if (!successful(index)) {
logInfo("Lost TID %s (task %s:%d)".format(tid, taskSet.id, index))
copiesRunning(index) -= 1
// Check if the problem is a map output fetch failure. In that case, this
// task will never succeed on any node, so tell the scheduler about it.
- if (serializedData != null && serializedData.limit() > 0) {
- val reason = ser.deserialize[TaskEndReason](serializedData, getClass.getClassLoader)
- reason match {
+ reason.foreach {
+ _ match {
case fetchFailed: FetchFailed =>
logInfo("Loss was due to fetch failure from " + fetchFailed.bmAddress)
sched.listener.taskEnded(tasks(index), fetchFailed, null, null, info, null)
- finished(index) = true
- tasksFinished += 1
+ successful(index) = true
+ tasksSuccessful += 1
sched.taskSetFinished(this)
- decreaseRunningTasks(runningTasks)
- return
-
- case taskResultTooBig: TaskResultTooBigFailure =>
- logInfo("Loss was due to task %s result exceeding Akka frame size; aborting job".format(
- tid))
- abort("Task %s result exceeded Akka frame size".format(tid))
+ removeAllRunningTasks()
return
case ef: ExceptionFailure =>
@@ -534,13 +500,16 @@ private[spark] class ClusterTaskSetManager(
logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
+ case TaskResultLost =>
+ logInfo("Lost result for TID %s on host %s".format(tid, info.host))
+ sched.listener.taskEnded(tasks(index), TaskResultLost, null, null, info, null)
+
case _ => {}
}
}
// On non-fetch failures, re-enqueue the task as pending for a max number of retries
addPendingTask(index)
- // Count failed attempts only on FAILED and LOST state (not on KILLED)
- if (state == TaskState.FAILED || state == TaskState.LOST) {
+ if (state != TaskState.KILLED) {
numFailures(index) += 1
if (numFailures(index) > MAX_TASK_FAILURES) {
logError("Task %s:%d failed more than %d times; aborting job".format(
@@ -564,22 +533,36 @@ private[spark] class ClusterTaskSetManager(
causeOfFailure = message
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.listener.taskSetFailed(taskSet, message)
- decreaseRunningTasks(runningTasks)
+ removeAllRunningTasks()
sched.taskSetFinished(this)
}
- override def increaseRunningTasks(taskNum: Int) {
- runningTasks += taskNum
- if (parent != null) {
- parent.increaseRunningTasks(taskNum)
+ /** If the given task ID is not in the set of running tasks, adds it.
+ *
+ * Used to keep track of the number of running tasks, for enforcing scheduling policies.
+ */
+ def addRunningTask(tid: Long) {
+ if (runningTasksSet.add(tid) && parent != null) {
+ parent.increaseRunningTasks(1)
+ }
+ runningTasks = runningTasksSet.size
+ }
+
+ /** If the given task ID is in the set of running tasks, removes it. */
+ def removeRunningTask(tid: Long) {
+ if (runningTasksSet.remove(tid) && parent != null) {
+ parent.decreaseRunningTasks(1)
}
+ runningTasks = runningTasksSet.size
}
- override def decreaseRunningTasks(taskNum: Int) {
- runningTasks -= taskNum
+ private def removeAllRunningTasks() {
+ val numRunningTasks = runningTasksSet.size
+ runningTasksSet.clear()
if (parent != null) {
- parent.decreaseRunningTasks(taskNum)
+ parent.decreaseRunningTasks(numRunningTasks)
}
+ runningTasks = 0
}
override def getSchedulableByName(name: String): Schedulable = {
@@ -615,10 +598,10 @@ private[spark] class ClusterTaskSetManager(
if (tasks(0).isInstanceOf[ShuffleMapTask]) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
- if (finished(index)) {
- finished(index) = false
+ if (successful(index)) {
+ successful(index) = false
copiesRunning(index) -= 1
- tasksFinished -= 1
+ tasksSuccessful -= 1
addPendingTask(index)
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
@@ -628,7 +611,7 @@ private[spark] class ClusterTaskSetManager(
}
// Also re-enqueue any tasks that were running on the node
for ((tid, info) <- taskInfos if info.running && info.executorId == execId) {
- taskLost(tid, TaskState.KILLED, null)
+ handleFailedTask(tid, TaskState.KILLED, None)
}
}
@@ -641,13 +624,13 @@ private[spark] class ClusterTaskSetManager(
*/
override def checkSpeculatableTasks(): Boolean = {
// Can't speculate if we only have one task, or if all tasks have finished.
- if (numTasks == 1 || tasksFinished == numTasks) {
+ if (numTasks == 1 || tasksSuccessful == numTasks) {
return false
}
var foundTasks = false
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
- if (tasksFinished >= minFinishedForSpeculation) {
+ if (tasksSuccessful >= minFinishedForSpeculation) {
val time = clock.getTime()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
@@ -658,7 +641,7 @@ private[spark] class ClusterTaskSetManager(
logDebug("Task length threshold for speculation: " + threshold)
for ((tid, info) <- taskInfos) {
val index = info.index
- if (!finished(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
+ if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
@@ -672,7 +655,7 @@ private[spark] class ClusterTaskSetManager(
}
override def hasPendingTasks(): Boolean = {
- numTasks > 0 && tasksFinished < numTasks
+ numTasks > 0 && tasksSuccessful < numTasks
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 9c36d221f6..c0b836bf1a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster
import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{Utils, SerializableBuffer}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 49f668eb32..b6f0ec961a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -28,6 +28,7 @@ import akka.pattern.ask
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
import org.apache.spark.{SparkException, Logging, TaskState}
+import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
new file mode 100644
index 0000000000..feec8ecfe4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskResultGetter.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{LinkedBlockingDeque, ThreadFactory, ThreadPoolExecutor, TimeUnit}
+
+import org.apache.spark._
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
+import org.apache.spark.serializer.SerializerInstance
+
+/**
+ * Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
+ */
+private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+ extends Logging {
+ private val MIN_THREADS = System.getProperty("spark.resultGetter.minThreads", "4").toInt
+ private val MAX_THREADS = System.getProperty("spark.resultGetter.maxThreads", "4").toInt
+ private val getTaskResultExecutor = new ThreadPoolExecutor(
+ MIN_THREADS,
+ MAX_THREADS,
+ 0L,
+ TimeUnit.SECONDS,
+ new LinkedBlockingDeque[Runnable],
+ new ResultResolverThreadFactory)
+
+ class ResultResolverThreadFactory extends ThreadFactory {
+ private var counter = 0
+ private var PREFIX = "Result resolver thread"
+
+ override def newThread(r: Runnable): Thread = {
+ val thread = new Thread(r, "%s-%s".format(PREFIX, counter))
+ counter += 1
+ thread.setDaemon(true)
+ return thread
+ }
+ }
+
+ protected val serializer = new ThreadLocal[SerializerInstance] {
+ override def initialValue(): SerializerInstance = {
+ return sparkEnv.closureSerializer.newInstance()
+ }
+ }
+
+ def enqueueSuccessfulTask(
+ taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ getTaskResultExecutor.execute(new Runnable {
+ override def run() {
+ try {
+ val result = serializer.get().deserialize[TaskResult[_]](serializedData) match {
+ case directResult: DirectTaskResult[_] => directResult
+ case IndirectTaskResult(blockId) =>
+ logDebug("Fetching indirect task result for TID %s".format(tid))
+ val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
+ if (!serializedTaskResult.isDefined) {
+ /* We won't be able to get the task result if the machine that ran the task failed
+ * between when the task ended and when we tried to fetch the result, or if the
+ * block manager had to flush the result. */
+ scheduler.handleFailedTask(
+ taskSetManager, tid, TaskState.FINISHED, Some(TaskResultLost))
+ return
+ }
+ val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
+ serializedTaskResult.get)
+ sparkEnv.blockManager.master.removeBlock(blockId)
+ deserializedResult
+ }
+ result.metrics.resultSize = serializedData.limit()
+ scheduler.handleSuccessfulTask(taskSetManager, tid, result)
+ } catch {
+ case cnf: ClassNotFoundException =>
+ val loader = Thread.currentThread.getContextClassLoader
+ taskSetManager.abort("ClassNotFound with classloader: " + loader)
+ case ex =>
+ taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
+ }
+ }
+ })
+ }
+
+ def enqueueFailedTask(taskSetManager: ClusterTaskSetManager, tid: Long, taskState: TaskState,
+ serializedData: ByteBuffer) {
+ var reason: Option[TaskEndReason] = None
+ getTaskResultExecutor.execute(new Runnable {
+ override def run() {
+ try {
+ if (serializedData != null && serializedData.limit() > 0) {
+ reason = Some(serializer.get().deserialize[TaskEndReason](
+ serializedData, getClass.getClassLoader))
+ }
+ } catch {
+ case cnd: ClassNotFoundException =>
+ // Log an error but keep going here -- the task failed, so not catastropic if we can't
+ // deserialize the reason.
+ val loader = Thread.currentThread.getContextClassLoader
+ logError(
+ "Could not deserialize TaskEndReason: ClassNotFound with classloader " + loader)
+ case ex => {}
+ }
+ scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
+ }
+ })
+ }
+
+ def stop() {
+ getTaskResultExecutor.shutdownNow()
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index babe875fa1..bf4040fafc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -15,22 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.mesos
+package org.apache.spark.scheduler.cluster.mesos
-import com.google.protobuf.ByteString
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
+import org.apache.spark.{SparkException, Logging, SparkContext, TaskState}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend}
/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 541f86e338..50cbc2ca92 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -15,22 +15,24 @@
* limitations under the License.
*/
-package org.apache.spark.scheduler.mesos
+package org.apache.spark.scheduler.cluster.mesos
-import com.google.protobuf.ByteString
+import java.io.File
+import java.util.{ArrayList => JArrayList, List => JList}
+import java.util.Collections
+
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+import com.google.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import org.apache.spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import org.apache.spark.TaskState
+import org.apache.spark.{Logging, SparkException, SparkContext, TaskState}
+import org.apache.spark.scheduler.TaskDescription
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, ExecutorExited, ExecutorLossReason}
+import org.apache.spark.scheduler.cluster.{SchedulerBackend, SlaveLost, WorkerOffer}
import org.apache.spark.util.Utils
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index 8cb4d1396f..4d1bb1c639 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -31,8 +31,7 @@ import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.ExecutorURLClassLoader
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster._
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import akka.actor._
import org.apache.spark.util.Utils
@@ -92,7 +91,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
var rootPool: Pool = null
val schedulingMode: SchedulingMode = SchedulingMode.withName(
System.getProperty("spark.scheduler.mode", "FIFO"))
- val activeTaskSets = new HashMap[String, TaskSetManager]
+ val activeTaskSets = new HashMap[String, LocalTaskSetManager]
val taskIdToTaskSetId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
@@ -211,7 +210,8 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
- val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
+ val taskResult = new DirectTaskResult(
+ result, accumUpdates, deserializedTask.metrics.getOrElse(null))
val serializedResult = ser.serialize(taskResult)
localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
index e52cb998bd..c2e2399ccb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalTaskSetManager.scala
@@ -21,16 +21,16 @@ import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
-import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
+import org.apache.spark.{ExceptionFailure, Logging, SparkEnv, SparkException, Success, TaskState}
import org.apache.spark.TaskState.TaskState
-import org.apache.spark.scheduler.{Task, TaskResult, TaskSet}
-import org.apache.spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
+import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Pool, Schedulable, Task,
+ TaskDescription, TaskInfo, TaskLocality, TaskResult, TaskSet, TaskSetManager}
private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
extends TaskSetManager with Logging {
- var parent: Schedulable = null
+ var parent: Pool = null
var weight: Int = 1
var minShare: Int = 0
var runningTasks: Int = 0
@@ -49,14 +49,14 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val numFailures = new Array[Int](numTasks)
val MAX_TASK_FAILURES = sched.maxFailures
- override def increaseRunningTasks(taskNum: Int): Unit = {
+ def increaseRunningTasks(taskNum: Int): Unit = {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
- override def decreaseRunningTasks(taskNum: Int): Unit = {
+ def decreaseRunningTasks(taskNum: Int): Unit = {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
@@ -132,7 +132,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
+ def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
case TaskState.FINISHED =>
@@ -152,7 +152,12 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
val index = info.index
val task = taskSet.tasks(index)
info.markSuccessful()
- val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
+ val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) match {
+ case directResult: DirectTaskResult[_] => directResult
+ case IndirectTaskResult(blockId) => {
+ throw new SparkException("Expect only DirectTaskResults when using LocalScheduler")
+ }
+ }
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
numFinished += 1
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 13b98a51a1..7852849ce5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -484,7 +484,7 @@ private[spark] class BlockManager(
for (loc <- locations) {
logDebug("Getting remote block " + blockId + " from " + loc)
val data = BlockManagerWorker.syncGetBlock(
- GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
+ GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
if (data != null) {
return Some(dataDeserialize(blockId, data))
}
@@ -495,10 +495,45 @@ private[spark] class BlockManager(
}
/**
+ * Get block from remote block managers as serialized bytes.
+ */
+ def getRemoteBytes(blockId: String): Option[ByteBuffer] = {
+ // TODO: As with getLocalBytes, this is very similar to getRemote and perhaps should be
+ // refactored.
+ if (blockId == null) {
+ throw new IllegalArgumentException("Block Id is null")
+ }
+ logDebug("Getting remote block " + blockId + " as bytes")
+
+ val locations = master.getLocations(blockId)
+ for (loc <- locations) {
+ logDebug("Getting remote block " + blockId + " from " + loc)
+ val data = BlockManagerWorker.syncGetBlock(
+ GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
+ if (data != null) {
+ return Some(data)
+ }
+ logDebug("The value of block " + blockId + " is null")
+ }
+ logDebug("Block " + blockId + " not found")
+ return None
+ }
+
+ /**
* Get a block from the block manager (either local or remote).
*/
def get(blockId: String): Option[Iterator[Any]] = {
- getLocal(blockId).orElse(getRemote(blockId))
+ val local = getLocal(blockId)
+ if (local.isDefined) {
+ logInfo("Found block %s locally".format(blockId))
+ return local
+ }
+ val remote = getRemote(blockId)
+ if (remote.isDefined) {
+ logInfo("Found block %s remotely".format(blockId))
+ return remote
+ }
+ None
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 3b3b2342fa..77a39c71ed 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils}
private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
extends BlockStore(blockManager) {
- case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false)
+ case class Entry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
- private var currentMemory = 0L
+ @volatile private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
@@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def remove(blockId: String): Boolean = {
entries.synchronized {
- val entry = entries.get(blockId)
+ val entry = entries.remove(blockId)
if (entry != null) {
- entries.remove(blockId)
currentMemory -= entry.size
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
@@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
override def clear() {
entries.synchronized {
entries.clear()
+ currentMemory = 0
}
logInfo("MemoryStore cleared")
}
@@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
- entries.synchronized { entries.put(blockId, entry) }
- currentMemory += size
+ entries.synchronized {
+ entries.put(blockId, entry)
+ currentMemory += size
+ }
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 3ec9760ed0..453394dfda 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -21,7 +21,7 @@ import scala.util.Random
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index d1868dcf78..42e9be6e19 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -26,8 +26,8 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
+import org.apache.spark.scheduler.TaskInfo
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Executors
import org.apache.spark.ui.UIUtils
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 3b428effaf..b39c0e9769 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -21,7 +21,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.{NodeSeq, Node}
-import org.apache.spark.scheduler.cluster.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode
import org.apache.spark.ui.Page._
import org.apache.spark.ui.UIUtils._
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 5d46f38a2a..eb3b4e8522 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -21,10 +21,8 @@ import scala.Seq
import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
-import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.TaskInfo
import org.apache.spark.executor.TaskMetrics
-import collection.mutable
+import org.apache.spark.scheduler._
/**
* Tracks task-level information to be displayed in the UI.
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index 54e273fd8b..c1ee2f3d00 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -31,8 +31,9 @@ import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.{ExceptionFailure, SparkContext, Success}
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import collection.mutable
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index b3d3666944..06810d8dbc 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -21,8 +21,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.xml.Node
-import org.apache.spark.scheduler.Stage
-import org.apache.spark.scheduler.cluster.Schedulable
+import org.apache.spark.scheduler.{Schedulable, Stage}
import org.apache.spark.ui.UIUtils
/** Table showing list of pools */
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index a9969ab1c0..163a3746ea 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,12 +23,12 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
+import org.apache.spark.{ExceptionFailure}
+import org.apache.spark.executor.TaskMetrics
import org.apache.spark.ui.UIUtils._
import org.apache.spark.ui.Page._
import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{ExceptionFailure}
-import org.apache.spark.scheduler.cluster.TaskInfo
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
/** Page showing statistics and task list for a given stage */
private[spark] class StagePage(parent: JobProgressUI) {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 32776eaa25..07db8622da 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -22,8 +22,7 @@ import java.util.Date
import scala.xml.Node
import scala.collection.mutable.HashSet
-import org.apache.spark.scheduler.cluster.{SchedulingMode, TaskInfo}
-import org.apache.spark.scheduler.Stage
+import org.apache.spark.scheduler.{SchedulingMode, Stage, TaskInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index be215fc127..94ce50e964 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -77,6 +77,19 @@ private[spark] object Utils extends Logging {
return ois.readObject.asInstanceOf[T]
}
+ /** Deserialize a Long value (used for {@link org.apache.spark.api.python.PythonPartitioner}) */
+ def deserializeLongValue(bytes: Array[Byte]) : Long = {
+ // Note: we assume that we are given a Long value encoded in network (big-endian) byte order
+ var result = bytes(7) & 0xFFL
+ result = result + ((bytes(6) & 0xFFL) << 8)
+ result = result + ((bytes(5) & 0xFFL) << 16)
+ result = result + ((bytes(4) & 0xFFL) << 24)
+ result = result + ((bytes(3) & 0xFFL) << 32)
+ result = result + ((bytes(2) & 0xFFL) << 40)
+ result = result + ((bytes(1) & 0xFFL) << 48)
+ result + ((bytes(0) & 0xFFL) << 56)
+ }
+
/** Serialize via nested stream using specific serializer */
def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
val osWrapper = ser.serializeStream(new OutputStream {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index adc6ca94ff..25b9c3eb78 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -332,6 +332,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
}
exception.getMessage should endWith("result exceeded Akka frame size")
}
+
}
object DistributedSuite {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 096023f476..8fd1115207 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -25,7 +25,6 @@ import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.parallel.mutable
import org.apache.spark._
-import org.apache.spark.rdd.CoalescedRDDPartition
class RDDSuite extends FunSuite with SharedSparkContext {
@@ -321,6 +320,44 @@ class RDDSuite extends FunSuite with SharedSparkContext {
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
}
+ test("take") {
+ var nums = sc.makeRDD(Range(1, 1000), 1)
+ assert(nums.take(0).size === 0)
+ assert(nums.take(1) === Array(1))
+ assert(nums.take(3) === Array(1, 2, 3))
+ assert(nums.take(500) === (1 to 500).toArray)
+ assert(nums.take(501) === (1 to 501).toArray)
+ assert(nums.take(999) === (1 to 999).toArray)
+ assert(nums.take(1000) === (1 to 999).toArray)
+
+ nums = sc.makeRDD(Range(1, 1000), 2)
+ assert(nums.take(0).size === 0)
+ assert(nums.take(1) === Array(1))
+ assert(nums.take(3) === Array(1, 2, 3))
+ assert(nums.take(500) === (1 to 500).toArray)
+ assert(nums.take(501) === (1 to 501).toArray)
+ assert(nums.take(999) === (1 to 999).toArray)
+ assert(nums.take(1000) === (1 to 999).toArray)
+
+ nums = sc.makeRDD(Range(1, 1000), 100)
+ assert(nums.take(0).size === 0)
+ assert(nums.take(1) === Array(1))
+ assert(nums.take(3) === Array(1, 2, 3))
+ assert(nums.take(500) === (1 to 500).toArray)
+ assert(nums.take(501) === (1 to 501).toArray)
+ assert(nums.take(999) === (1 to 999).toArray)
+ assert(nums.take(1000) === (1 to 999).toArray)
+
+ nums = sc.makeRDD(Range(1, 1000), 1000)
+ assert(nums.take(0).size === 0)
+ assert(nums.take(1) === Array(1))
+ assert(nums.take(3) === Array(1, 2, 3))
+ assert(nums.take(500) === (1 to 500).toArray)
+ assert(nums.take(501) === (1 to 501).toArray)
+ assert(nums.take(999) === (1 to 999).toArray)
+ assert(nums.take(1000) === (1 to 999).toArray)
+ }
+
test("top with predefined ordering") {
val nums = Array.range(1, 100000)
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 94f66c94c6..2f933246b0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -32,9 +32,7 @@ import org.apache.spark.{Dependency, ShuffleDependency, OneToOneDependency}
import org.apache.spark.{FetchFailed, Success, TaskEndReason}
import org.apache.spark.storage.{BlockManagerId, BlockManagerMaster}
-import org.apache.spark.scheduler.cluster.Pool
-import org.apache.spark.scheduler.cluster.SchedulingMode
-import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
/**
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index aac7c207cb..41a161e08a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,7 +29,9 @@ import org.apache.spark.SparkContext._
class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatchers {
- test("local metrics") {
+ // TODO: This test has a race condition since the DAGScheduler now reports results
+ // asynchronously. It needs to be updated for that patch.
+ ignore("local metrics") {
sc = new SparkContext("local[4]", "test")
val listener = new SaveStageInfo
sc.addSparkListener(listener)
@@ -43,6 +45,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
val d = sc.parallelize(1 to 1e4.toInt, 64).map{i => w(i)}
d.count
+ Thread.sleep(1000)
listener.stageInfos.size should be (1)
val d2 = d.map{i => w(i) -> i * 2}.setName("shuffle input 1")
@@ -54,6 +57,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
d4.collectAsMap
+ Thread.sleep(1000)
listener.stageInfos.size should be (4)
listener.stageInfos.foreach {stageInfo =>
//small test, so some tasks might take less than 1 millisecond, but average should be greater than 1 ms
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 1b50ce06b3..95d3553d91 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -43,16 +43,16 @@ class FakeTaskSetManager(
stageId = initStageId
name = "TaskSet_"+stageId
override val numTasks = initNumTasks
- tasksFinished = 0
+ tasksSuccessful = 0
- override def increaseRunningTasks(taskNum: Int) {
+ def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}
- override def decreaseRunningTasks(taskNum: Int) {
+ def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
@@ -79,7 +79,7 @@ class FakeTaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
- if (tasksFinished + runningTasks < numTasks) {
+ if (tasksSuccessful + runningTasks < numTasks) {
increaseRunningTasks(1)
return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
}
@@ -92,8 +92,8 @@ class FakeTaskSetManager(
def taskFinished() {
decreaseRunningTasks(1)
- tasksFinished +=1
- if (tasksFinished == numTasks) {
+ tasksSuccessful +=1
+ if (tasksSuccessful == numTasks) {
parent.removeSchedulable(this)
}
}
@@ -114,7 +114,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
val taskSetQueue = rootPool.getSortedTaskSetQueue()
/* Just for Test*/
for (manager <- taskSetQueue) {
- logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
+ logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
+ manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
index ff70a2cdf0..80d0c5a5e9 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -40,6 +40,7 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
val startedTasks = new ArrayBuffer[Long]
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
val finishedManagers = new ArrayBuffer[TaskSetManager]
+ val taskSetsFailed = new ArrayBuffer[String]
val executors = new mutable.HashMap[String, String] ++ liveExecutors
@@ -63,7 +64,9 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
def executorLost(execId: String) {}
- def taskSetFailed(taskSet: TaskSet, reason: String) {}
+ def taskSetFailed(taskSet: TaskSet, reason: String) {
+ taskSetsFailed += taskSet.id
+ }
}
def removeExecutor(execId: String): Unit = executors -= execId
@@ -101,7 +104,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
// Tell it the task has finished
- manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+ manager.handleSuccessfulTask(0, createTaskResult(0))
assert(sched.endedTasks(0) === Success)
assert(sched.finishedManagers.contains(manager))
}
@@ -125,14 +128,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
// Finish the first two tasks
- manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
- manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1))
+ manager.handleSuccessfulTask(0, createTaskResult(0))
+ manager.handleSuccessfulTask(1, createTaskResult(1))
assert(sched.endedTasks(0) === Success)
assert(sched.endedTasks(1) === Success)
assert(!sched.finishedManagers.contains(manager))
// Finish the last task
- manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2))
+ manager.handleSuccessfulTask(2, createTaskResult(2))
assert(sched.endedTasks(2) === Success)
assert(sched.finishedManagers.contains(manager))
}
@@ -253,6 +256,47 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
}
+ test("task result lost") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Tell it the task has finished but the result was lost.
+ manager.handleFailedTask(0, TaskState.FINISHED, Some(TaskResultLost))
+ assert(sched.endedTasks(0) === TaskResultLost)
+
+ // Re-offer the host -- now we should get task 0 again.
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+ }
+
+ test("repeated failures lead to task set abortion") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
+ // after the last failure.
+ (0 until manager.MAX_TASK_FAILURES).foreach { index =>
+ val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
+ assert(offerResult != None,
+ "Expect resource offer on iteration %s to return a task".format(index))
+ assert(offerResult.get.index === 0)
+ manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
+ if (index < manager.MAX_TASK_FAILURES) {
+ assert(!sched.taskSetsFailed.contains(taskSet.id))
+ } else {
+ assert(sched.taskSetsFailed.contains(taskSet.id))
+ }
+ }
+ }
+
+
/**
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred
* locations for each task (given as varargs) if this sequence is not empty.
@@ -267,7 +311,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
new TaskSet(tasks, 0, 0, 0, null)
}
- def createTaskResult(id: Int): ByteBuffer = {
- ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics)))
+ def createTaskResult(id: Int): DirectTaskResult[Int] = {
+ new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
new file mode 100644
index 0000000000..370a3eb0eb
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.nio.ByteBuffer
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
+import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
+
+/**
+ * Removes the TaskResult from the BlockManager before delegating to a normal TaskResultGetter.
+ *
+ * Used to test the case where a BlockManager evicts the task result (or dies) before the
+ * TaskResult is retrieved.
+ */
+class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
+ extends TaskResultGetter(sparkEnv, scheduler) {
+ var removedResult = false
+
+ override def enqueueSuccessfulTask(
+ taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
+ if (!removedResult) {
+ // Only remove the result once, since we'd like to test the case where the task eventually
+ // succeeds.
+ serializer.get().deserialize[TaskResult[_]](serializedData) match {
+ case IndirectTaskResult(blockId) =>
+ sparkEnv.blockManager.master.removeBlock(blockId)
+ case directResult: DirectTaskResult[_] =>
+ taskSetManager.abort("Internal error: expect only indirect results")
+ }
+ serializedData.rewind()
+ removedResult = true
+ }
+ super.enqueueSuccessfulTask(taskSetManager, tid, serializedData)
+ }
+}
+
+/**
+ * Tests related to handling task results (both direct and indirect).
+ */
+class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndAfterAll
+ with LocalSparkContext {
+
+ override def beforeAll {
+ // Set the Akka frame size to be as small as possible (it must be an integer, so 1 is as small
+ // as we can make it) so the tests don't take too long.
+ System.setProperty("spark.akka.frameSize", "1")
+ }
+
+ before {
+ // Use local-cluster mode because results are returned differently when running with the
+ // LocalScheduler.
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ }
+
+ override def afterAll {
+ System.clearProperty("spark.akka.frameSize")
+ }
+
+ test("handling results smaller than Akka frame size") {
+ val result = sc.parallelize(Seq(1), 1).map(x => 2 * x).reduce((x, y) => x)
+ assert(result === 2)
+ }
+
+ test("handling results larger than Akka frame size") {
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
+ val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(akkaFrameSize).toArray)
+
+ val RESULT_BLOCK_ID = "taskresult_0"
+ assert(sc.env.blockManager.master.getLocations(RESULT_BLOCK_ID).size === 0,
+ "Expect result to be removed from the block manager.")
+ }
+
+ test("task retried if result missing from block manager") {
+ // If this test hangs, it's probably because no resource offers were made after the task
+ // failed.
+ val scheduler: ClusterScheduler = sc.taskScheduler match {
+ case clusterScheduler: ClusterScheduler =>
+ clusterScheduler
+ case _ =>
+ assert(false, "Expect local cluster to use ClusterScheduler")
+ throw new ClassCastException
+ }
+ scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.tcp.message-frame-size").toInt
+ val result = sc.parallelize(Seq(1), 1).map(x => 1.to(akkaFrameSize).toArray).reduce((x, y) => x)
+ assert(result === 1.to(akkaFrameSize).toArray)
+
+ // Make sure two tasks were run (one failed one, and a second retried one).
+ assert(scheduler.nextTaskId.get() === 2)
+ }
+}
+
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index f4e1d4e802..3764f4d1a0 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -25,6 +25,13 @@ import org.eclipse.jetty.server.Server
class UISuite extends FunSuite {
test("jetty port increases under contention") {
val startPort = 4040
+ val server = new Server(startPort)
+
+ Try { server.start() } match {
+ case Success(s) =>
+ case Failure(e) =>
+ // Either case server port is busy hence setup for test complete
+ }
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
// Allow some wiggle room in case ports on the machine are under contention
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index e2859caf58..4684c8c972 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util
import com.google.common.base.Charsets
import com.google.common.io.Files
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
+import java.nio.{ByteBuffer, ByteOrder}
import org.scalatest.FunSuite
import org.apache.commons.io.FileUtils
import scala.util.Random
@@ -135,5 +136,15 @@ class UtilsSuite extends FunSuite {
FileUtils.deleteDirectory(tmpDir2)
}
+
+ test("deserialize long value") {
+ val testval : Long = 9730889947L
+ val bbuf = ByteBuffer.allocate(8)
+ assert(bbuf.hasArray)
+ bbuf.order(ByteOrder.BIG_ENDIAN)
+ bbuf.putLong(testval)
+ assert(bbuf.array.length === 8)
+ assert(Utils.deserializeLongValue(bbuf.array) === testval)
+ }
}
diff --git a/docs/_config.yml b/docs/_config.yml
index ad851673a5..02067f9750 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -3,8 +3,8 @@ markdown: kramdown
# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
-SPARK_VERSION: 0.8.0-SNAPSHOT
-SPARK_VERSION_SHORT: 0.8.0
+SPARK_VERSION: 0.9.0-incubating-SNAPSHOT
+SPARK_VERSION_SHORT: 0.9.0-SNAPSHOT
SCALA_VERSION: 2.10
MESOS_VERSION: 0.13.0
SPARK_ISSUE_TRACKER_URL: https://spark-project.atlassian.net
diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html
index 238ad26de0..0c1d657cde 100755
--- a/docs/_layouts/global.html
+++ b/docs/_layouts/global.html
@@ -6,7 +6,7 @@
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge,chrome=1">
- <title>{{ page.title }} - Spark {{site.SPARK_VERSION}} Documentation</title>
+ <title>{{ page.title }} - Spark {{site.SPARK_VERSION_SHORT}} Documentation</title>
<meta name="description" content="">
<link rel="stylesheet" href="css/bootstrap.min.css">
@@ -109,7 +109,7 @@
</ul>
</li>
</ul>
- <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION}}</span></p>-->
+ <!--<p class="navbar-text pull-right"><span class="version-text">v{{site.SPARK_VERSION_SHORT}}</span></p>-->
</div>
</div>
</div>
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index c611db0af4..30128ec45d 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -50,6 +50,7 @@ The command to launch the YARN Client is as follows:
--master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
+ --name <application_name> \
--queue <queue_name>
For example:
diff --git a/ec2/README b/ec2/README
index 0add81312c..433da37b4c 100644
--- a/ec2/README
+++ b/ec2/README
@@ -1,4 +1,4 @@
This folder contains a script, spark-ec2, for launching Spark clusters on
Amazon EC2. Usage instructions are available online at:
-http://spark-project.org/docs/latest/ec2-scripts.html
+http://spark.incubator.apache.org/docs/latest/ec2-scripts.html
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 6b7d202a88..65868b76b9 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -23,6 +23,7 @@ from __future__ import with_statement
import logging
import os
+import pipes
import random
import shutil
import subprocess
@@ -36,6 +37,9 @@ import boto
from boto.ec2.blockdevicemapping import BlockDeviceMapping, EBSBlockDeviceType
from boto import ec2
+class UsageError(Exception):
+ pass
+
# A URL prefix from which to fetch AMI information
AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
@@ -66,7 +70,7 @@ def parse_args():
"slaves across multiple (an additional $0.01/Gb for bandwidth" +
"between zones applies)")
parser.add_option("-a", "--ami", help="Amazon Machine Image ID to use")
- parser.add_option("-v", "--spark-version", default="0.7.3",
+ parser.add_option("-v", "--spark-version", default="0.8.0",
help="Version of Spark to use: 'X.Y.Z' or a specific git hash")
parser.add_option("--spark-git-repo",
default="https://github.com/mesos/spark",
@@ -103,11 +107,7 @@ def parse_args():
parser.print_help()
sys.exit(1)
(action, cluster_name) = args
- if opts.identity_file == None and action in ['launch', 'login', 'start']:
- print >> stderr, ("ERROR: The -i or --identity-file argument is " +
- "required for " + action)
- sys.exit(1)
-
+
# Boto config check
# http://boto.cloudhackers.com/en/latest/boto_config_tut.html
home_dir = os.getenv('HOME')
@@ -155,7 +155,7 @@ def is_active(instance):
# Return correct versions of Spark and Shark, given the supplied Spark version
def get_spark_shark_version(opts):
- spark_shark_map = {"0.7.3": "0.7.0"}
+ spark_shark_map = {"0.7.3": "0.7.1", "0.8.0": "0.8.0"}
version = opts.spark_version.replace("v", "")
if version not in spark_shark_map:
print >> stderr, "Don't know about Spark version: %s" % version
@@ -390,10 +390,18 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
master = master_nodes[0].public_dns_name
if deploy_ssh_key:
- print "Copying SSH key %s to master..." % opts.identity_file
- ssh(master, opts, 'mkdir -p ~/.ssh')
- scp(master, opts, opts.identity_file, '~/.ssh/id_rsa')
- ssh(master, opts, 'chmod 600 ~/.ssh/id_rsa')
+ print "Generating cluster's SSH key on master..."
+ key_setup = """
+ [ -f ~/.ssh/id_rsa ] ||
+ (ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa &&
+ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys)
+ """
+ ssh(master, opts, key_setup)
+ dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
+ print "Transferring cluster's SSH key to slaves..."
+ for slave in slave_nodes:
+ print slave.public_dns_name
+ ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
modules = ['spark', 'shark', 'ephemeral-hdfs', 'persistent-hdfs',
'mapreduce', 'spark-standalone']
@@ -535,18 +543,33 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
dest.write(text)
dest.close()
# rsync the whole directory over to the master machine
- command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
- "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
- subprocess.check_call(command, shell=True)
+ command = [
+ 'rsync', '-rv',
+ '-e', stringify_command(ssh_command(opts)),
+ "%s/" % tmp_dir,
+ "%s@%s:/" % (opts.user, active_master)
+ ]
+ subprocess.check_call(command)
# Remove the temp directory we created above
shutil.rmtree(tmp_dir)
-# Copy a file to a given host through scp, throwing an exception if scp fails
-def scp(host, opts, local_file, dest_file):
- subprocess.check_call(
- "scp -q -o StrictHostKeyChecking=no -i %s '%s' '%s@%s:%s'" %
- (opts.identity_file, local_file, opts.user, host, dest_file), shell=True)
+def stringify_command(parts):
+ if isinstance(parts, str):
+ return parts
+ else:
+ return ' '.join(map(pipes.quote, parts))
+
+
+def ssh_args(opts):
+ parts = ['-o', 'StrictHostKeyChecking=no']
+ if opts.identity_file is not None:
+ parts += ['-i', opts.identity_file]
+ return parts
+
+
+def ssh_command(opts):
+ return ['ssh'] + ssh_args(opts)
# Run a command on a host through ssh, retrying up to two times
@@ -556,18 +579,42 @@ def ssh(host, opts, command):
while True:
try:
return subprocess.check_call(
- "ssh -t -o StrictHostKeyChecking=no -i %s %s@%s '%s'" %
- (opts.identity_file, opts.user, host, command), shell=True)
+ ssh_command(opts) + ['-t', '%s@%s' % (opts.user, host), stringify_command(command)])
except subprocess.CalledProcessError as e:
if (tries > 2):
- raise e
- print "Couldn't connect to host {0}, waiting 30 seconds".format(e)
+ # If this was an ssh failure, provide the user with hints.
+ if e.returncode == 255:
+ raise UsageError("Failed to SSH to remote host {0}.\nPlease check that you have provided the correct --identity-file and --key-pair parameters and try again.".format(host))
+ else:
+ raise e
+ print >> stderr, "Error executing remote command, retrying after 30 seconds: {0}".format(e)
time.sleep(30)
tries = tries + 1
+def ssh_read(host, opts, command):
+ return subprocess.check_output(
+ ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)])
+def ssh_write(host, opts, command, input):
+ tries = 0
+ while True:
+ proc = subprocess.Popen(
+ ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)],
+ stdin=subprocess.PIPE)
+ proc.stdin.write(input)
+ proc.stdin.close()
+ status = proc.wait()
+ if status == 0:
+ break
+ elif (tries > 2):
+ raise RuntimeError("ssh_write failed with error %s" % proc.returncode)
+ else:
+ print >> stderr, "Error {0} while executing remote command, retrying after 30 seconds".format(status)
+ time.sleep(30)
+ tries = tries + 1
+
# Gets a list of zones to launch instances in
def get_zones(conn, opts):
@@ -586,7 +633,7 @@ def get_partition(total, num_partitions, current_partitions):
return num_slaves_this_zone
-def main():
+def real_main():
(opts, action, cluster_name) = parse_args()
try:
conn = ec2.connect_to_region(opts.region)
@@ -669,11 +716,11 @@ def main():
conn, opts, cluster_name)
master = master_nodes[0].public_dns_name
print "Logging into master " + master + "..."
- proxy_opt = ""
+ proxy_opt = []
if opts.proxy_port != None:
- proxy_opt = "-D " + opts.proxy_port
- subprocess.check_call("ssh -o StrictHostKeyChecking=no -i %s %s %s@%s" %
- (opts.identity_file, proxy_opt, opts.user, master), shell=True)
+ proxy_opt = ['-D', opts.proxy_port]
+ subprocess.check_call(
+ ssh_command(opts) + proxy_opt + ['-t', "%s@%s" % (opts.user, master)])
elif action == "get-master":
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
@@ -715,6 +762,13 @@ def main():
sys.exit(1)
+def main():
+ try:
+ real_main()
+ except UsageError, e:
+ print >> stderr, "\nError:\n", e
+
+
if __name__ == "__main__":
logging.basicConfig()
main()
diff --git a/examples/pom.xml b/examples/pom.xml
index ca06a9ad8d..c6c9def5be 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -21,38 +21,46 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-examples</artifactId>
+ <artifactId>spark-examples_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Examples</name>
<url>http://spark.incubator.apache.org/</url>
+ <repositories>
+ <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
+ <repository>
+ <id>lib</id>
+ <url>file://${project.basedir}/lib</url>
+ </repository>
+ </repositories>
+
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming</artifactId>
+ <artifactId>spark-streaming_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib</artifactId>
+ <artifactId>spark-mllib_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel</artifactId>
+ <artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
@@ -72,6 +80,12 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka</artifactId>
+ <version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
@@ -161,7 +175,7 @@
</goals>
<configuration>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
diff --git a/make-distribution.sh b/make-distribution.sh
index bffb19843c..32bbdb90a5 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -95,7 +95,7 @@ cp $FWDIR/assembly/target/scala*/*assembly*hadoop*.jar "$DISTDIR/jars/"
# Copy other things
mkdir "$DISTDIR"/conf
-cp "$FWDIR/conf/*.template" "$DISTDIR"/conf
+cp "$FWDIR"/conf/*.template "$DISTDIR"/conf
cp -r "$FWDIR/bin" "$DISTDIR"
cp -r "$FWDIR/python" "$DISTDIR"
cp "$FWDIR/spark-class" "$DISTDIR"
diff --git a/mllib/pom.xml b/mllib/pom.xml
index f4190148b1..a57bddeff3 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib</artifactId>
+ <artifactId>spark-mllib_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project ML Library</name>
<url>http://spark.incubator.apache.org/</url>
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -48,12 +48,12 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.10</artifactId>
+ <artifactId>scalatest_${scala-short.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
- <artifactId>scalacheck_2.10</artifactId>
+ <artifactId>scalacheck_${scala-short.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/pom.xml b/pom.xml
index e9f90135dd..9278856e5a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Spark Project Parent POM</name>
<url>http://spark.incubator.apache.org/</url>
@@ -40,6 +40,7 @@
<connection>scm:git:git@github.com:apache/incubator-spark.git</connection>
<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-spark.git</developerConnection>
<url>scm:git:git@github.com:apache/incubator-spark.git</url>
+ <tag>HEAD</tag>
</scm>
<developers>
<developer>
@@ -558,7 +559,6 @@
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
- <arg>-optimise</arg>
<arg>-deprecation</arg>
</args>
<jvmArgs>
@@ -605,7 +605,7 @@
<junitxml>.</junitxml>
<filereports>${project.build.directory}/SparkTestSuite.txt</filereports>
<argLine>-Xms64m -Xmx3g</argLine>
- <stderr/>
+ <stderr />
</configuration>
<executions>
<execution>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5d4250a53b..a734558142 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -79,9 +79,10 @@ object SparkBuild extends Build {
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.apache.spark",
- version := "0.8.0-SNAPSHOT",
+ version := "0.9.0-incubating-SNAPSHOT",
scalaVersion := "2.10.3",
- scalacOptions := Seq("-unchecked", "-optimize", "-deprecation", "-target:" + SCALAC_JVM_VERSION),
+ scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation",
+ "-target:" + SCALAC_JVM_VERSION),
javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
retrieveManaged := true,
@@ -96,6 +97,9 @@ object SparkBuild extends Build {
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
+ // also check the local Maven repository ~/.m2
+ resolvers ++= Seq(Resolver.file("Local Maven Repo", file(Path.userHome + "/.m2/repository"))),
+
// For Sonatype publishing
resolvers ++= Seq("sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
"sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/"),
@@ -149,6 +153,7 @@ object SparkBuild extends Build {
*/
+
libraryDependencies ++= Seq(
"io.netty" % "netty-all" % "4.0.0.CR1",
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
@@ -175,6 +180,7 @@ object SparkBuild extends Build {
val slf4jVersion = "1.7.2"
+ val excludeCglib = ExclusionRule(organization = "org.sonatype.sisu.inject")
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
val excludeAsm = ExclusionRule(organization = "asm")
@@ -198,7 +204,6 @@ object SparkBuild extends Build {
"commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
"org.ow2.asm" % "asm" % "4.0",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
- "de.javakaffee" % "kryo-serializers" % "0.22",
"com.typesafe.akka" %% "akka-remote" % "2.2.1" excludeAll(excludeNetty),
"com.typesafe.akka" %% "akka-slf4j" % "2.2.1" excludeAll(excludeNetty),
"net.liftweb" %% "lift-json" % "2.5.1" excludeAll(excludeNetty),
@@ -216,7 +221,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" %% "chill" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
- )
+ )
)
def rootSettings = sharedSettings ++ Seq(
@@ -246,6 +251,7 @@ object SparkBuild extends Build {
exclude("log4j","log4j")
exclude("org.apache.cassandra.deps", "avro")
excludeAll(excludeSnappy)
+ excludeAll(excludeCglib)
)
) ++ assemblySettings ++ extraAssemblySettings
@@ -285,10 +291,10 @@ object SparkBuild extends Build {
def yarnEnabledSettings = Seq(
libraryDependencies ++= Seq(
// Exclude rule required for all ?
- "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm),
- "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm)
+ "org.apache.hadoop" % "hadoop-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib),
+ "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJackson, excludeNetty, excludeAsm, excludeCglib)
)
)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7611b13e82..33dc865256 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -29,7 +29,7 @@ from threading import Thread
from pyspark import cloudpickle
from pyspark.serializers import batched, Batch, dump_pickle, load_pickle, \
- read_from_pickle_file
+ read_from_pickle_file, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -690,11 +690,13 @@ class RDD(object):
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
def add_shuffle_key(split, iterator):
+
buckets = defaultdict(list)
+
for (k, v) in iterator:
buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems():
- yield str(split)
+ yield pack_long(split)
yield dump_pickle(Batch(items))
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
@@ -831,8 +833,8 @@ class RDD(object):
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
- filter_func = lambda tpl: len(tpl[1][0]) > 0 and len(tpl[1][1]) == 0
- map_func = lambda tpl: [(tpl[0], val) for val in tpl[1][0]]
+ filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
+ map_func = lambda (key, vals): [(key, val) for val in vals[0]]
return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
def subtract(self, other, numPartitions=None):
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index fecacd1241..54fed1c9c7 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -67,6 +67,10 @@ def write_long(value, stream):
stream.write(struct.pack("!q", value))
+def pack_long(value):
+ return struct.pack("!q", value)
+
+
def read_int(stream):
length = stream.read(4)
if length == "":
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index dc205b306f..a475959090 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -35,7 +35,7 @@ print """Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
- /__ / .__/\_,_/_/ /_/\_\ version 0.8.0
+ /__ / .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
/_/
"""
print "Using Python version %s (%s, %s)" % (
diff --git a/repl-bin/pom.xml b/repl-bin/pom.xml
index 3685561501..c983ea5dfb 100644
--- a/repl-bin/pom.xml
+++ b/repl-bin/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl-bin</artifactId>
+ <artifactId>spark-repl-bin_${scala-short.version}</artifactId>
<packaging>pom</packaging>
<name>Spark Project REPL binary packaging</name>
<url>http://spark.incubator.apache.org/</url>
@@ -40,18 +40,18 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel</artifactId>
+ <artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl</artifactId>
+ <artifactId>spark-repl_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
@@ -89,7 +89,7 @@
</goals>
<configuration>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
diff --git a/repl/pom.xml b/repl/pom.xml
index a7b5e1f3c7..ff66493229 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-repl</artifactId>
+ <artifactId>spark-repl_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project REPL</name>
<url>http://spark.incubator.apache.org/</url>
@@ -39,18 +39,18 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-bagel</artifactId>
+ <artifactId>spark-bagel_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-mllib</artifactId>
+ <artifactId>spark-mllib_${scala-short.version}</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
@@ -103,14 +103,14 @@
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
- <property name="spark.classpath" refid="maven.test.classpath"/>
- <property environment="env"/>
+ <property name="spark.classpath" refid="maven.test.classpath" />
+ <property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
<condition>
<not>
<or>
- <isset property="env.SCALA_HOME"/>
- <isset property="env.SCALA_LIBRARY_PATH"/>
+ <isset property="env.SCALA_HOME" />
+ <isset property="env.SCALA_LIBRARY_PATH" />
</or>
</not>
</condition>
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index 31596cc02f..21b1ba305d 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -24,7 +24,7 @@ trait SparkILoopInit {
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
- /___/ .__/\_,_/_/ /_/\_\ version 0.8.0
+ /___/ .__/\_,_/_/ /_/\_\ version 0.9.0-SNAPSHOT
/_/
""")
import Properties._
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 8367256004..3f2033f34a 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -21,12 +21,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming</artifactId>
+ <artifactId>spark-streaming_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Streaming</name>
<url>http://spark.incubator.apache.org/</url>
@@ -42,7 +42,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -58,6 +58,7 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka</artifactId>
<version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
diff --git a/tools/pom.xml b/tools/pom.xml
index 0933c75a7f..db87b54dec 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -20,12 +20,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-tools</artifactId>
+ <artifactId>spark-tools_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project Tools</name>
<url>http://spark.incubator.apache.org/</url>
@@ -33,12 +33,12 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming</artifactId>
+ <artifactId>spark-streaming_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
diff --git a/yarn/pom.xml b/yarn/pom.xml
index 47e27ee41c..7770cbb0cc 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -20,12 +20,12 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.8.0-SNAPSHOT</version>
+ <version>0.9.0-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-yarn</artifactId>
+ <artifactId>spark-yarn_${scala-short.version}</artifactId>
<packaging>jar</packaging>
<name>Spark Project YARN Support</name>
<url>http://spark.incubator.apache.org/</url>
@@ -33,7 +33,7 @@
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core</artifactId>
+ <artifactId>spark-core_${scala-short.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -97,7 +97,7 @@
</goals>
<configuration>
<transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 844c707834..076dd3c9b0 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -106,7 +106,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
logInfo("Setting up application submission context for ASM")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
appContext.setApplicationId(appId)
- appContext.setApplicationName("Spark")
+ appContext.setApplicationName(args.appName)
return appContext
}
@@ -224,8 +224,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add Xmx for am memory
JAVA_OPTS += "-Xmx" + amMemory + "m "
- JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
@@ -241,6 +241,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 "
JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 "
}
+
if (env.isDefinedAt("SPARK_JAVA_OPTS")) {
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index cd651904d2..c56dbd99ba 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -32,6 +32,7 @@ class ClientArguments(val args: Array[String]) {
var numWorkers = 2
var amQueue = System.getProperty("QUEUE", "default")
var amMemory: Int = 512
+ var appName: String = "Spark"
// TODO
var inputFormatInfo: List[InputFormatInfo] = null
@@ -78,6 +79,10 @@ class ClientArguments(val args: Array[String]) {
amQueue = value
args = tail
+ case ("--name") :: value :: tail =>
+ appName = value
+ args = tail
+
case Nil =>
if (userJar == null || userClass == null) {
printUsageAndExit(1)
@@ -108,6 +113,7 @@ class ClientArguments(val args: Array[String]) {
" --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" +
" --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" +
" --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" +
+ " --name NAME The name of your application (Default: Spark)\n" +
" --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')"
)
System.exit(exitCode)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 6229167cb4..a60e8a3007 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
JAVA_OPTS += env("SPARK_JAVA_OPTS") + " "
}
- JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
+ JAVA_OPTS += " -Djava.io.tmpdir=" +
+ new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
+
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same