aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--conf/slaves2
-rw-r--r--core/pom.xml4
-rw-r--r--core/src/main/scala/spark/Aggregator.scala18
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala11
-rw-r--r--core/src/main/scala/spark/Dependency.scala5
-rw-r--r--core/src/main/scala/spark/MapOutputTracker.scala62
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala162
-rw-r--r--core/src/main/scala/spark/Partitioner.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala37
-rw-r--r--core/src/main/scala/spark/ShuffleFetcher.scala5
-rw-r--r--core/src/main/scala/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/spark/SparkEnv.scala19
-rw-r--r--core/src/main/scala/spark/Utils.scala41
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala21
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala6
-rw-r--r--core/src/main/scala/spark/api/python/PythonRDD.scala9
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala1
-rw-r--r--core/src/main/scala/spark/deploy/JsonProtocol.scala108
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala5
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/IndexPage.scala15
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala6
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala4
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala12
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala7
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala56
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala36
-rw-r--r--core/src/main/scala/spark/rdd/MappedValuesRDD.scala34
-rw-r--r--core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala51
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala33
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala30
-rw-r--r--core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala30
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala23
-rw-r--r--core/src/main/scala/spark/scheduler/ActiveJob.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/DAGScheduler.scala183
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/JobLogger.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/SparkListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/Stage.scala7
-rw-r--r--core/src/main/scala/spark/scheduler/Task.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/TaskLocation.scala34
-rw-r--r--core/src/main/scala/spark/scheduler/TaskResult.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala374
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala640
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala20
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala4
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala32
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala25
-rw-r--r--core/src/main/scala/spark/storage/BlockFetcherIterator.scala4
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala60
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala14
-rw-r--r--core/src/main/scala/spark/storage/DiskStore.scala4
-rw-r--r--core/src/main/scala/spark/storage/MemoryStore.scala6
-rw-r--r--core/src/main/scala/spark/storage/StorageUtils.scala4
-rw-r--r--core/src/main/scala/spark/ui/JettyUtils.scala7
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala10
-rw-r--r--core/src/main/scala/spark/ui/exec/ExecutorsUI.scala10
-rw-r--r--core/src/main/scala/spark/ui/jobs/StagePage.scala12
-rw-r--r--core/src/main/scala/spark/ui/jobs/StageTable.scala4
-rw-r--r--core/src/main/scala/spark/ui/storage/IndexPage.scala4
-rw-r--r--core/src/main/scala/spark/ui/storage/RDDPage.scala14
-rw-r--r--core/src/main/scala/spark/util/Clock.scala29
-rw-r--r--core/src/main/scala/spark/util/MutablePair.scala36
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java2
-rw-r--r--core/src/test/scala/spark/MapOutputTrackerSuite.scala12
-rw-r--r--core/src/test/scala/spark/PairRDDFunctionsSuite.scala7
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala96
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala16
-rw-r--r--core/src/test/scala/spark/ZippedPartitionsSuite.scala2
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala26
-rw-r--r--core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala)33
-rw-r--r--core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala273
-rw-r--r--core/src/test/scala/spark/scheduler/cluster/FakeTask.scala26
-rw-r--r--core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala)2
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala7
-rw-r--r--core/src/test/scala/spark/util/FakeClock.scala26
-rw-r--r--docs/configuration.md30
-rwxr-xr-xec2/spark_ec2.py2
-rw-r--r--pom.xml5
-rw-r--r--project/SparkBuild.scala1
-rw-r--r--python/pyspark/context.py14
-rw-r--r--python/pyspark/rdd.py4
-rw-r--r--python/pyspark/tests.py11
-rw-r--r--python/pyspark/worker.py13
-rw-r--r--python/test_support/userlib-0.1-py2.7.eggbin0 -> 1945 bytes
-rw-r--r--tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala8
-rw-r--r--yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala7
101 files changed, 1712 insertions, 1460 deletions
diff --git a/conf/slaves b/conf/slaves
index 6e315a8540..da0a01343d 100644
--- a/conf/slaves
+++ b/conf/slaves
@@ -1,2 +1,2 @@
-# A Spark Worker will be started on each of the machines listes below.
+# A Spark Worker will be started on each of the machines listed below.
localhost \ No newline at end of file
diff --git a/core/pom.xml b/core/pom.xml
index 2870906092..6627a87de1 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -103,10 +103,6 @@
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
- <groupId>net.liftweb</groupId>
- <artifactId>lift-json_2.9.2</artifactId>
- </dependency>
- <dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 136b4da61e..9af401986d 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -28,18 +28,18 @@ import scala.collection.JavaConversions._
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
case class Aggregator[K, V, C] (
- val createCombiner: V => C,
- val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C) {
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C) {
- def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
- for ((k, v) <- iter) {
- val oldC = combiners.get(k)
+ for (kv <- iter) {
+ val oldC = combiners.get(kv._1)
if (oldC == null) {
- combiners.put(k, createCombiner(v))
+ combiners.put(kv._1, createCombiner(kv._2))
} else {
- combiners.put(k, mergeValue(oldC, v))
+ combiners.put(kv._1, mergeValue(oldC, kv._2))
}
}
combiners.iterator
@@ -47,7 +47,7 @@ case class Aggregator[K, V, C] (
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
- for ((k, c) <- iter) {
+ iter.foreach { case(k, c) =>
val oldC = combiners.get(k)
if (oldC == null) {
combiners.put(k, c)
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 8f6953b1f5..1ec95ed9b8 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -28,8 +28,9 @@ import spark.util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
- override def fetch[K, V](
- shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer) = {
+ override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
+ : Iterator[T] =
+ {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
@@ -49,12 +50,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
}
- def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = {
+ def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
- block.asInstanceOf[Iterator[(K, V)]]
+ block.asInstanceOf[Iterator[T]]
}
case None => {
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
@@ -73,7 +74,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)
- CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
+ CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index d17e70a4fa..d5a9606570 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -39,16 +39,15 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
/**
* Represents a dependency on the output of a shuffle stage.
- * @param shuffleId the shuffle id
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializerClass class name of the serializer to use
*/
class ShuffleDependency[K, V](
- @transient rdd: RDD[(K, V)],
+ @transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializerClass: String = null)
- extends Dependency(rdd) {
+ extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId()
}
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
index 2c417e31db..0cd0341a72 100644
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/spark/MapOutputTracker.scala
@@ -64,11 +64,11 @@ private[spark] class MapOutputTracker extends Logging {
// Incremented every time a fetch fails so that client nodes know to clear
// their cache of map output locations if this happens.
- private var generation: Long = 0
- private val generationLock = new java.lang.Object
+ private var epoch: Long = 0
+ private val epochLock = new java.lang.Object
// Cache a serialized version of the output statuses for each shuffle to send them out faster
- var cacheGeneration = generation
+ var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
@@ -108,10 +108,10 @@ private[spark] class MapOutputTracker extends Logging {
def registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
- changeGeneration: Boolean = false) {
+ changeEpoch: Boolean = false) {
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
- if (changeGeneration) {
- incrementGeneration()
+ if (changeEpoch) {
+ incrementEpoch()
}
}
@@ -124,7 +124,7 @@ private[spark] class MapOutputTracker extends Logging {
array(mapId) = null
}
}
- incrementGeneration()
+ incrementEpoch()
} else {
throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
}
@@ -206,58 +206,58 @@ private[spark] class MapOutputTracker extends Logging {
trackerActor = null
}
- // Called on master to increment the generation number
- def incrementGeneration() {
- generationLock.synchronized {
- generation += 1
- logDebug("Increasing generation to " + generation)
+ // Called on master to increment the epoch number
+ def incrementEpoch() {
+ epochLock.synchronized {
+ epoch += 1
+ logDebug("Increasing epoch to " + epoch)
}
}
- // Called on master or workers to get current generation number
- def getGeneration: Long = {
- generationLock.synchronized {
- return generation
+ // Called on master or workers to get current epoch number
+ def getEpoch: Long = {
+ epochLock.synchronized {
+ return epoch
}
}
- // Called on workers to update the generation number, potentially clearing old outputs
- // because of a fetch failure. (Each Mesos task calls this with the latest generation
+ // Called on workers to update the epoch number, potentially clearing old outputs
+ // because of a fetch failure. (Each worker task calls this with the latest epoch
// number on the master at the time it was created.)
- def updateGeneration(newGen: Long) {
- generationLock.synchronized {
- if (newGen > generation) {
- logInfo("Updating generation to " + newGen + " and clearing cache")
+ def updateEpoch(newEpoch: Long) {
+ epochLock.synchronized {
+ if (newEpoch > epoch) {
+ logInfo("Updating epoch to " + newEpoch + " and clearing cache")
// mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
mapStatuses.clear()
- generation = newGen
+ epoch = newEpoch
}
}
}
def getSerializedLocations(shuffleId: Int): Array[Byte] = {
var statuses: Array[MapStatus] = null
- var generationGotten: Long = -1
- generationLock.synchronized {
- if (generation > cacheGeneration) {
+ var epochGotten: Long = -1
+ epochLock.synchronized {
+ if (epoch > cacheEpoch) {
cachedSerializedStatuses.clear()
- cacheGeneration = generation
+ cacheEpoch = epoch
}
cachedSerializedStatuses.get(shuffleId) match {
case Some(bytes) =>
return bytes
case None =>
statuses = mapStatuses(shuffleId)
- generationGotten = generation
+ epochGotten = epoch
}
}
// If we got here, we failed to find the serialized locations in the cache, so we pulled
// out a snapshot of the locations as "locs"; let's serialize and return that
val bytes = serializeStatuses(statuses)
logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
- // Add them into the table only if the generation hasn't changed while we were working
- generationLock.synchronized {
- if (generation == generationGotten) {
+ // Add them into the table only if the epoch hasn't changed while we were working
+ epochLock.synchronized {
+ if (epoch == epochGotten) {
cachedSerializedStatuses(shuffleId) = bytes
}
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 6701f24ff9..cc1285dd95 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -21,9 +21,8 @@ import java.nio.ByteBuffer
import java.util.{Date, HashMap => JHashMap}
import java.text.SimpleDateFormat
-import scala.collection.Map
+import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
@@ -51,8 +50,7 @@ import spark.Partitioner._
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `spark.SparkContext._` at the top of your program to use these functions.
*/
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
- self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
@@ -86,17 +84,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
- self.mapPartitions(aggregator.combineValuesByKey(_), true)
+ self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
} else if (mapSideCombine) {
- val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
- val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
- partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
+ val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
+ .setSerializer(serializerClass)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
- val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
- values.mapPartitions(aggregator.combineValuesByKey(_), true)
+ val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
+ values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
}
}
@@ -168,7 +167,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
- for ((k, v) <- iter) {
+ iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
@@ -176,11 +175,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
- for ((k, v) <- m2) {
+ m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
- return m1
+ m1
}
self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -234,31 +233,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
/**
- * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
- * is true, Spark will group values of the same key together on the map side before the
- * repartitioning, to only send each key over the network once. If a large number of
- * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
- * be set to true.
+ * Return a copy of the RDD partitioned using the specified partitioner.
*/
- def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
- if (getKeyClass().isArray) {
- if (mapSideCombine) {
- throw new SparkException("Cannot use map-side combining with array keys.")
- }
- if (partitioner.isInstanceOf[HashPartitioner]) {
- throw new SparkException("Default partitioner cannot partition array keys.")
- }
- }
- if (mapSideCombine) {
- def createCombiner(v: V) = ArrayBuffer(v)
- def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
- def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2
- val bufs = combineByKey[ArrayBuffer[V]](
- createCombiner _, mergeValue _, mergeCombiners _, partitioner)
- bufs.flatMapValues(buf => buf)
- } else {
- new ShuffledRDD[K, V](self, partitioner)
+ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+ if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
}
+ new ShuffledRDD[K, V, (K, V)](self, partitioner)
}
/**
@@ -267,9 +248,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
- this.cogroup(other, partitioner).flatMapValues {
- case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
@@ -280,13 +260,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
- this.cogroup(other, partitioner).flatMapValues {
- case (vs, ws) =>
- if (ws.isEmpty) {
- vs.iterator.map(v => (v, None))
- } else {
- for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
- }
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ if (ws.isEmpty) {
+ vs.iterator.map(v => (v, None))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+ }
}
}
@@ -298,13 +277,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
- this.cogroup(other, partitioner).flatMapValues {
- case (vs, ws) =>
- if (vs.isEmpty) {
- ws.iterator.map(w => (None, w))
- } else {
- for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
- }
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ if (vs.isEmpty) {
+ ws.iterator.map(w => (None, w))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+ }
}
}
@@ -396,7 +374,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
- def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+ def collectAsMap(): Map[K, V] = {
+ val data = self.toArray()
+ val map = new mutable.HashMap[K, V]
+ map.sizeHint(data.length)
+ data.foreach { case (k, v) => map.put(k, v) }
+ map
+ }
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
@@ -424,13 +408,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
- partitioner)
+ val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues {
- case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+ prfs.mapValues { case Seq(vs, ws) =>
+ (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
@@ -443,15 +424,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(K, _)]],
- other1.asInstanceOf[RDD[(K, _)]],
- other2.asInstanceOf[RDD[(K, _)]]),
- partitioner)
+ val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues {
- case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ prfs.mapValues { case Seq(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
}
}
@@ -682,7 +658,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
+ def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -721,54 +697,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
}
-/**
- * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
- * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
- * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
- */
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
- self: RDD[(K, V)])
- extends Logging
- with Serializable {
-
- /**
- * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
- * `collect` or `save` on the resulting RDD will return or output an ordered list of records
- * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
- * order of the keys).
- */
- def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = {
- val shuffled =
- new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending))
- shuffled.mapPartitions(iter => {
- val buf = iter.toArray
- if (ascending) {
- buf.sortWith((x, y) => x._1 < y._1).iterator
- } else {
- buf.sortWith((x, y) => x._1 > y._1).iterator
- }
- }, true)
- }
-}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
- override def getPartitions = firstParent[(K, V)].partitions
- override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Partition, context: TaskContext) =
- firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
-}
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[(K, V)].partitions
- override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Partition, context: TaskContext) = {
- firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
- }
-}
private[spark] object Manifests {
val seqSeqManifest = classManifest[Seq[Seq[_]]]
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 6035bc075e..65da8235d7 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -84,7 +84,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
*/
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
partitions: Int,
- @transient rdd: RDD[(K,V)],
+ @transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 160b3e9d83..25a6951732 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,9 +31,8 @@ import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-import spark.api.java.JavaRDD
-import spark.broadcast.Broadcast
import spark.Partitioner._
+import spark.api.java.JavaRDD
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
@@ -221,8 +220,8 @@ abstract class RDD[T: ClassManifest](
}
/**
- * Get the preferred location of a split, taking into account whether the
- * RDD is checkpointed or not.
+ * Get the preferred locations of a partition (as hostnames), taking into account whether the
+ * RDD is checkpointed.
*/
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
@@ -287,7 +286,10 @@ abstract class RDD[T: ClassManifest](
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
// include a shuffle step so that our upstream tasks are still distributed
- new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+ new CoalescedRDD(
+ new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
+ new HashPartitioner(numPartitions)),
+ numPartitions).keys
} else {
new CoalescedRDD(this, numPartitions)
}
@@ -302,8 +304,8 @@ abstract class RDD[T: ClassManifest](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
- var multiplier = 3.0
- var initialCount = this.count()
+ val multiplier = 3.0
+ val initialCount = this.count()
var maxSelected = 0
if (num < 0) {
@@ -515,22 +517,19 @@ abstract class RDD[T: ClassManifest](
* *same number of partitions*, but does *not* require them to have the same number
* of elements in each partition.
*/
- def zipPartitions[B: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B]) => Iterator[V],
- rdd2: RDD[B]): RDD[V] =
+ def zipPartitions[B: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B])
+ (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C]): RDD[V] =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C])
+ (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest](
- f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V],
- rdd2: RDD[B],
- rdd3: RDD[C],
- rdd4: RDD[D]): RDD[V] =
+ def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+ (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
+ (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
index dcced035e7..a6839cf7a4 100644
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ShuffleFetcher.scala
@@ -22,12 +22,13 @@ import spark.serializer.Serializer
private[spark] abstract class ShuffleFetcher {
+
/**
* Fetch the shuffle outputs for a given ShuffleDependency.
* @return An iterator over the elements of the fetched shuffle outputs.
*/
- def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
- serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[(K,V)]
+ def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
+ serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]
/** Stop the fetcher */
def stop() {}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index f020b2554b..fdd2dfa810 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -20,19 +20,14 @@ package spark
import java.io._
import java.net.URI
import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import scala.util.DynamicVariable
-import scala.collection.mutable.{ConcurrentMap, HashMap}
-
-import akka.actor.Actor._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -54,23 +49,22 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
import spark.deploy.LocalSparkCluster
import spark.partial.{ApproximateEvaluator, PartialResult}
-import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
+import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
+ OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
- SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+ SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import ui.{SparkUI}
-import spark.metrics._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -836,11 +830,11 @@ class SparkContext(
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
- private var nextShuffleId = new AtomicInteger(0)
+ private val nextShuffleId = new AtomicInteger(0)
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
- private var nextRddId = new AtomicInteger(0)
+ private val nextRddId = new AtomicInteger(0)
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
@@ -889,7 +883,7 @@ object SparkContext {
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) =
- new OrderedRDDFunctions(rdd)
+ new OrderedRDDFunctions[K, V, (K, V)](rdd)
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
index 5f71df33b6..1f66e9cc7f 100644
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ b/core/src/main/scala/spark/SparkEnv.scala
@@ -55,11 +55,7 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
- val metricsSystem: MetricsSystem,
- // To be set only as part of initialization of SparkContext.
- // (executorId, defaultHostPort) => executorHostPort
- // If executorId is NOT found, return defaultHostPort
- var executorIdToHostPort: Option[(String, String) => String]) {
+ val metricsSystem: MetricsSystem) {
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -97,16 +93,6 @@ class SparkEnv (
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
}
}
-
- def resolveExecutorIdToHostPort(executorId: String, defaultHostPort: String): String = {
- val env = SparkEnv.get
- if (env.executorIdToHostPort.isEmpty) {
- // default to using host, not host port. Relevant to non cluster modes.
- return defaultHostPort
- }
-
- env.executorIdToHostPort.get(executorId, defaultHostPort)
- }
}
object SparkEnv extends Logging {
@@ -250,7 +236,6 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
- metricsSystem,
- None)
+ metricsSystem)
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 7ea9b0c28a..bb8aad3f4c 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -394,44 +394,17 @@ private object Utils extends Logging {
retval
}
-/*
- // Used by DEBUG code : remove when all testing done
- private val ipPattern = Pattern.compile("^[0-9]+(\\.[0-9]+)*$")
def checkHost(host: String, message: String = "") {
- // Currently catches only ipv4 pattern, this is just a debugging tool - not rigourous !
- // if (host.matches("^[0-9]+(\\.[0-9]+)*$")) {
- if (ipPattern.matcher(host).matches()) {
- Utils.logErrorWithStack("Unexpected to have host " + host + " which matches IP pattern. Message " + message)
- }
- if (Utils.parseHostPort(host)._2 != 0){
- Utils.logErrorWithStack("Unexpected to have host " + host + " which has port in it. Message " + message)
- }
+ assert(host.indexOf(':') == -1, message)
}
- // Used by DEBUG code : remove when all testing done
def checkHostPort(hostPort: String, message: String = "") {
- val (host, port) = Utils.parseHostPort(hostPort)
- checkHost(host)
- if (port <= 0){
- Utils.logErrorWithStack("Unexpected to have port " + port + " which is not valid in " + hostPort + ". Message " + message)
- }
+ assert(hostPort.indexOf(':') != -1, message)
}
// Used by DEBUG code : remove when all testing done
def logErrorWithStack(msg: String) {
try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
- // temp code for debug
- System.exit(-1)
- }
-*/
-
- // Once testing is complete in various modes, replace with this ?
- def checkHost(host: String, message: String = "") {}
- def checkHostPort(hostPort: String, message: String = "") {}
-
- // Used by DEBUG code : remove when all testing done
- def logErrorWithStack(msg: String) {
- try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
}
// Typically, this will be of order of number of nodes in cluster
@@ -518,9 +491,9 @@ private object Utils extends Logging {
}
/**
- * Convert a memory quantity in bytes to a human-readable string such as "4.0 MB".
+ * Convert a quantity in bytes to a human-readable string such as "4.0 MB".
*/
- def memoryBytesToString(size: Long): String = {
+ def bytesToString(size: Long): String = {
val TB = 1L << 40
val GB = 1L << 30
val MB = 1L << 20
@@ -563,10 +536,10 @@ private object Utils extends Logging {
}
/**
- * Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB".
+ * Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
*/
- def memoryMegabytesToString(megabytes: Long): String = {
- memoryBytesToString(megabytes * 1024L * 1024L)
+ def megabytesToString(megabytes: Long): String = {
+ bytesToString(megabytes * 1024L * 1024L)
}
/**
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index ff12e8b76c..effe6e5e0d 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -30,17 +30,18 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.function.{Function2 => JFunction2}
-import spark.api.java.function.{Function => JFunction}
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.OrderedRDDFunctions
-import spark.storage.StorageLevel
import spark.HashPartitioner
import spark.Partitioner
import spark.Partitioner._
import spark.RDD
import spark.SparkContext.rddToPairRDDFunctions
+import spark.api.java.function.{Function2 => JFunction2}
+import spark.api.java.function.{Function => JFunction}
+import spark.partial.BoundedDouble
+import spark.partial.PartialResult
+import spark.rdd.OrderedRDDFunctions
+import spark.storage.StorageLevel
+
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
@@ -253,11 +254,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
fromRDD(rdd.subtract(other, p))
/**
- * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
- * is true, Spark will group values of the same key together on the map side before the
- * repartitioning, to only send each key over the network once. If a large number of
- * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
- * be set to true.
+ * Return a copy of the RDD partitioned using the specified partitioner.
*/
def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] =
fromRDD(rdd.partitionBy(partitioner))
@@ -567,7 +564,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
override def compare(b: K) = comp.compare(a, b)
}
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
- fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending))
+ fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
}
/**
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index e0255ed23e..2c2b138f16 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -207,12 +207,12 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* of elements in each partition.
*/
def zipPartitions[U, V](
- f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V],
- other: JavaRDDLike[U, _]): JavaRDD[V] = {
+ other: JavaRDDLike[U, _],
+ f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = {
def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator(
f.apply(asJavaIterator(x), asJavaIterator(y)).iterator())
JavaRDD.fromRDD(
- rdd.zipPartitions(fn, other.rdd)(other.classManifest, f.elementType()))(f.elementType())
+ rdd.zipPartitions(other.rdd)(fn)(other.classManifest, f.elementType()))(f.elementType())
}
// Actions (launch a job to return a value to the user program)
diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala
index 2dd79f7100..49671437d0 100644
--- a/core/src/main/scala/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/spark/api/python/PythonRDD.scala
@@ -33,6 +33,7 @@ private[spark] class PythonRDD[T: ClassManifest](
parent: RDD[T],
command: Seq[String],
envVars: JMap[String, String],
+ pythonIncludes: JList[String],
preservePartitoning: Boolean,
pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
@@ -44,10 +45,11 @@ private[spark] class PythonRDD[T: ClassManifest](
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
def this(parent: RDD[T], command: String, envVars: JMap[String, String],
+ pythonIncludes: JList[String],
preservePartitoning: Boolean, pythonExec: String,
broadcastVars: JList[Broadcast[Array[Byte]]],
accumulator: Accumulator[JList[Array[Byte]]]) =
- this(parent, PipedRDD.tokenize(command), envVars, preservePartitoning, pythonExec,
+ this(parent, PipedRDD.tokenize(command), envVars, pythonIncludes, preservePartitoning, pythonExec,
broadcastVars, accumulator)
override def getPartitions = parent.partitions
@@ -79,6 +81,11 @@ private[spark] class PythonRDD[T: ClassManifest](
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
}
+ // Python includes (*.zip and *.egg files)
+ dataOut.writeInt(pythonIncludes.length)
+ for (f <- pythonIncludes) {
+ PythonRDD.writeAsPickle(f, dataOut)
+ }
dataOut.flush()
// Serialized user code
for (elem <- command) {
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 31861f3ac2..0db13ffc98 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -80,6 +80,7 @@ private[deploy] object DeployMessages {
case class RegisteredApplication(appId: String) extends DeployMessage
+ // TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
Utils.checkHostPort(hostPort, "Required hostport")
}
diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala
index bd1db7c294..6b71b953dd 100644
--- a/core/src/main/scala/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala
@@ -17,7 +17,7 @@
package spark.deploy
-import net.liftweb.json.JsonDSL._
+import scala.util.parsing.json.{JSONArray, JSONObject, JSONType}
import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import spark.deploy.master.{ApplicationInfo, WorkerInfo}
@@ -25,61 +25,63 @@ import spark.deploy.worker.ExecutorRunner
private[spark] object JsonProtocol {
- def writeWorkerInfo(obj: WorkerInfo) = {
- ("id" -> obj.id) ~
- ("host" -> obj.host) ~
- ("port" -> obj.port) ~
- ("webuiaddress" -> obj.webUiAddress) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed)
- }
- def writeApplicationInfo(obj: ApplicationInfo) = {
- ("starttime" -> obj.startTime) ~
- ("id" -> obj.id) ~
- ("name" -> obj.desc.name) ~
- ("cores" -> obj.desc.maxCores) ~
- ("user" -> obj.desc.user) ~
- ("memoryperslave" -> obj.desc.memoryPerSlave) ~
- ("submitdate" -> obj.submitDate.toString)
- }
+ def writeWorkerInfo(obj: WorkerInfo): JSONType = JSONObject(Map(
+ "id" -> obj.id,
+ "host" -> obj.host,
+ "port" -> obj.port,
+ "webuiaddress" -> obj.webUiAddress,
+ "cores" -> obj.cores,
+ "coresused" -> obj.coresUsed,
+ "memory" -> obj.memory,
+ "memoryused" -> obj.memoryUsed,
+ "state" -> obj.state.toString
+ ))
- def writeApplicationDescription(obj: ApplicationDescription) = {
- ("name" -> obj.name) ~
- ("cores" -> obj.maxCores) ~
- ("memoryperslave" -> obj.memoryPerSlave) ~
- ("user" -> obj.user)
- }
+ def writeApplicationInfo(obj: ApplicationInfo): JSONType = JSONObject(Map(
+ "starttime" -> obj.startTime,
+ "id" -> obj.id,
+ "name" -> obj.desc.name,
+ "cores" -> obj.desc.maxCores,
+ "user" -> obj.desc.user,
+ "memoryperslave" -> obj.desc.memoryPerSlave,
+ "submitdate" -> obj.submitDate.toString
+ ))
- def writeExecutorRunner(obj: ExecutorRunner) = {
- ("id" -> obj.execId) ~
- ("memory" -> obj.memory) ~
- ("appid" -> obj.appId) ~
- ("appdesc" -> writeApplicationDescription(obj.appDesc))
- }
+ def writeApplicationDescription(obj: ApplicationDescription): JSONType = JSONObject(Map(
+ "name" -> obj.name,
+ "cores" -> obj.maxCores,
+ "memoryperslave" -> obj.memoryPerSlave,
+ "user" -> obj.user
+ ))
- def writeMasterState(obj: MasterStateResponse) = {
- ("url" -> ("spark://" + obj.uri)) ~
- ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
- ("cores" -> obj.workers.map(_.cores).sum) ~
- ("coresused" -> obj.workers.map(_.coresUsed).sum) ~
- ("memory" -> obj.workers.map(_.memory).sum) ~
- ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
- ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
- ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
- }
+ def writeExecutorRunner(obj: ExecutorRunner): JSONType = JSONObject(Map(
+ "id" -> obj.execId,
+ "memory" -> obj.memory,
+ "appid" -> obj.appId,
+ "appdesc" -> writeApplicationDescription(obj.appDesc)
+ ))
- def writeWorkerState(obj: WorkerStateResponse) = {
- ("id" -> obj.workerId) ~
- ("masterurl" -> obj.masterUrl) ~
- ("masterwebuiurl" -> obj.masterWebUiUrl) ~
- ("cores" -> obj.cores) ~
- ("coresused" -> obj.coresUsed) ~
- ("memory" -> obj.memory) ~
- ("memoryused" -> obj.memoryUsed) ~
- ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
- ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
- }
+ def writeMasterState(obj: MasterStateResponse): JSONType = JSONObject(Map(
+ "url" -> ("spark://" + obj.uri),
+ "workers" -> obj.workers.toList.map(writeWorkerInfo),
+ "cores" -> obj.workers.map(_.cores).sum,
+ "coresused" -> obj.workers.map(_.coresUsed).sum,
+ "memory" -> obj.workers.map(_.memory).sum,
+ "memoryused" -> obj.workers.map(_.memoryUsed).sum,
+ "activeapps" -> JSONArray(obj.activeApps.toList.map(writeApplicationInfo)),
+ "completedapps" -> JSONArray(obj.completedApps.toList.map(writeApplicationInfo))
+ ))
+
+ def writeWorkerState(obj: WorkerStateResponse): JSONType = JSONObject(Map(
+ "id" -> obj.workerId,
+ "masterurl" -> obj.masterUrl,
+ "masterwebuiurl" -> obj.masterWebUiUrl,
+ "cores" -> obj.cores,
+ "coresused" -> obj.coresUsed,
+ "memory" -> obj.memory,
+ "memoryused" -> obj.memoryUsed,
+ "executors" -> JSONArray(obj.executors.toList.map(writeExecutorRunner)),
+ "finishedexecutors" -> JSONArray(obj.finishedExecutors.toList.map(writeExecutorRunner))
+ ))
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 152cb2887a..04af5e149c 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -96,7 +96,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
- host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
+ host, workerPort, cores, Utils.megabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 405a1ec3a6..494a9b914d 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -17,6 +17,7 @@
package spark.deploy.master.ui
+import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
@@ -25,8 +26,6 @@ import akka.util.duration._
import javax.servlet.http.HttpServletRequest
-import net.liftweb.json.JsonAST.JValue
-
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import spark.deploy.JsonProtocol
import spark.deploy.master.ExecutorInfo
@@ -37,7 +36,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
implicit val timeout = parent.timeout
/** Executor details for a particular application */
- def renderJson(request: HttpServletRequest): JValue = {
+ def renderJson(request: HttpServletRequest): JSONType = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
index 47936e2bad..28e421e3bc 100644
--- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala
@@ -19,14 +19,13 @@ package spark.deploy.master.ui
import javax.servlet.http.HttpServletRequest
+import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import net.liftweb.json.JsonAST.JValue
-
import spark.Utils
import spark.deploy.DeployWebUI
import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
@@ -38,7 +37,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
implicit val timeout = parent.timeout
- def renderJson(request: HttpServletRequest): JValue = {
+ def renderJson(request: HttpServletRequest): JSONType = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeMasterState(state)
@@ -70,8 +69,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total,
{state.workers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory:</strong>
- {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total,
- {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
+ {Utils.megabytesToString(state.workers.map(_.memory).sum)} Total,
+ {Utils.megabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Applications:</strong>
{state.activeApps.size} Running,
{state.completedApps.size} Completed </li>
@@ -116,8 +115,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td>{worker.state}</td>
<td>{worker.cores} ({worker.coresUsed} Used)</td>
<td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}>
- {Utils.memoryMegabytesToString(worker.memory)}
- ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used)
+ {Utils.megabytesToString(worker.memory)}
+ ({Utils.megabytesToString(worker.memoryUsed)} Used)
</td>
</tr>
}
@@ -135,7 +134,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
- {Utils.memoryMegabytesToString(app.desc.memoryPerSlave)}
+ {Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index f661accd2f..5e53d95ac2 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -40,13 +40,11 @@ private[spark] class ExecutorRunner(
val memory: Int,
val worker: ActorRef,
val workerId: String,
- val hostPort: String,
+ val host: String,
val sparkHome: File,
val workDir: File)
extends Logging {
- Utils.checkHostPort(hostPort, "Expected hostport")
-
val fullId = appId + "/" + execId
var workerThread: Thread = null
var process: Process = null
@@ -92,7 +90,7 @@ private[spark] class ExecutorRunner(
/** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
case "{{EXECUTOR_ID}}" => execId.toString
- case "{{HOSTNAME}}" => Utils.parseHostPort(hostPort)._1
+ case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
case other => other
}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 0b5013b864..053ac55226 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -96,7 +96,7 @@ private[spark] class Worker(
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
- host, port, cores, Utils.memoryMegabytesToString(memory)))
+ host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
@@ -132,7 +132,7 @@ private[spark] class Worker(
case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(
- appId, execId, appDesc, cores_, memory_, self, workerId, host + ":" + port, new File(execSparkHome_), workDir)
+ appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index 700eb22d96..02993d58a0 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -19,14 +19,13 @@ package spark.deploy.worker.ui
import javax.servlet.http.HttpServletRequest
+import scala.util.parsing.json.JSONType
import scala.xml.Node
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
-import net.liftweb.json.JsonAST.JValue
-
import spark.Utils
import spark.deploy.JsonProtocol
import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
@@ -39,7 +38,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val worker = parent.worker
val timeout = parent.timeout
- def renderJson(request: HttpServletRequest): JValue = {
+ def renderJson(request: HttpServletRequest): JSONType = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
@@ -65,8 +64,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
Master URL:</strong> {workerState.masterUrl}
</li>
<li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li>
- <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)}
- ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li>
+ <li><strong>Memory:</strong> {Utils.megabytesToString(workerState.memory)}
+ ({Utils.megabytesToString(workerState.memoryUsed)} Used)</li>
</ul>
<p><a href={workerState.masterWebUiUrl}>Back to Master</a></p>
</div>
@@ -97,7 +96,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
<td>{executor.execId}</td>
<td>{executor.cores}</td>
<td sorttable_customkey={executor.memory.toString}>
- {Utils.memoryMegabytesToString(executor.memory)}
+ {Utils.megabytesToString(executor.memory)}
</td>
<td>
<ul class="unstyled">
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 22295069dc..717619f80d 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -114,7 +114,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
.format(appId, executorId, logType, math.max(startByte-byteLength, 0),
byteLength)}>
<button type="button" class="btn btn-default">
- Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}
+ Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
</a>
}
@@ -129,7 +129,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
<a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
format(appId, executorId, logType, endByte, byteLength)}>
<button type="button" class="btn btn-default">
- Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}
+ Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))}
</button>
</a>
}
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 05a960d7c5..036c7191ad 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -32,8 +32,12 @@ import spark._
/**
* The Mesos executor for Spark.
*/
-private[spark] class Executor(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) extends Logging {
-
+private[spark] class Executor(
+ executorId: String,
+ slaveHostname: String,
+ properties: Seq[(String, String)])
+ extends Logging
+{
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -125,8 +129,8 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
updateDependencies(taskFiles, taskJars)
val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
attemptedTask = Some(task)
- logInfo("Its generation is " + task.generation)
- env.mapOutputTracker.updateGeneration(task.generation)
+ logInfo("Its epoch is " + task.epoch)
+ env.mapOutputTracker.updateEpoch(task.epoch)
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 0ebb722d73..03800584ae 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -28,13 +28,12 @@ private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
- @transient lazy val locations_ = BlockManager.blockIdsToExecutorLocations(blockIds, SparkEnv.get)
+ @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
}).toArray
-
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val blockManager = SparkEnv.get.blockManager
val blockId = split.asInstanceOf[BlockRDDPartition].blockId
@@ -45,8 +44,8 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
}
}
- override def getPreferredLocations(split: Partition): Seq[String] =
+ override def getPreferredLocations(split: Partition): Seq[String] = {
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
-
+ }
}
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 150e5bca29..91b3e69d6f 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -64,7 +64,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
override def getPreferredLocations(split: Partition): Seq[String] = {
val currSplit = split.asInstanceOf[CartesianPartition]
- rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)
+ (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
}
override def compute(split: Partition, context: TaskContext) = {
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c540cd36eb..01b6c23dcc 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import spark.{Aggregator, Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
import spark.{Dependency, OneToOneDependency, ShuffleDependency}
@@ -52,13 +52,6 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
override def hashCode(): Int = idx
}
-private[spark] class CoGroupAggregator
- extends Aggregator[Any, Any, ArrayBuffer[Any]](
- { x => ArrayBuffer(x) },
- { (b, x) => b += x },
- { (b1, b2) => b1 ++ b2 })
- with Serializable
-
/**
* A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
@@ -66,34 +59,25 @@ private[spark] class CoGroupAggregator
*
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
- * @param mapSideCombine flag indicating whether to merge values before shuffle step. If the flag
- * is on, Spark does an extra pass over the data on the map side to merge
- * all values belonging to the same key together. This can reduce the amount
- * of data shuffled if and only if the number of distinct keys is very small,
- * and the ratio of key size to value size is also very small.
*/
-class CoGroupedRDD[K](
- @transient var rdds: Seq[RDD[(K, _)]],
- part: Partitioner,
- val mapSideCombine: Boolean = false,
- val serializerClass: String = null)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
- private val aggr = new CoGroupAggregator
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): CoGroupedRDD[K] = {
+ serializerClass = cls
+ this
+ }
override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd =>
+ rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
if (rdd.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + rdd)
+ logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
- logInfo("Adding shuffle dependency with " + rdd)
- if (mapSideCombine) {
- val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part, serializerClass)
- } else {
- new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
- }
+ logDebug("Adding shuffle dependency with " + rdd)
+ new ShuffleDependency[Any, Any](rdd, part, serializerClass)
}
}
}
@@ -138,23 +122,15 @@ class CoGroupedRDD[K](
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
- for ((k, v) <- rdd.iterator(itsSplit, context)) {
- getSeq(k.asInstanceOf[K])(depNum) += v
+ rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
+ getSeq(kv._1)(depNum) += kv._2
}
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- if (mapSideCombine) {
- // With map side combine on, for each key, the shuffle fetcher returns a list of values.
- fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, values) => getSeq(key)(depNum) ++= values
- }
- } else {
- // With map side combine off, for each key the shuffle fetcher returns a single value.
- fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, value) => getSeq(key)(depNum) += value
- }
+ fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
+ kv => getSeq(kv._1)(depNum) += kv._2
}
}
}
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000000..a6bdce89d8
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, V]].partitions
+
+ override val partitioner = firstParent[Product2[K, V]].partitioner
+
+ override def compute(split: Partition, context: TaskContext) = {
+ firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
+ f(v).map(x => (k, x))
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
new file mode 100644
index 0000000000..8334e3b557
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+
+import spark.{TaskContext, Partition, RDD}
+
+private[spark]
+class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, U]].partitions
+
+ override val partitioner = firstParent[Product2[K, U]].partitioner
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
+ firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
new file mode 100644
index 0000000000..9154b76035
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{RangePartitioner, Logging, RDD}
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
+ V: ClassManifest,
+ P <: Product2[K, V] : ClassManifest](
+ self: RDD[P])
+ extends Logging with Serializable {
+
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
+ def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
+ val part = new RangePartitioner(numPartitions, self, ascending)
+ val shuffled = new ShuffledRDD[K, V, P](self, part)
+ shuffled.mapPartitions(iter => {
+ val buf = iter.toArray
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
+ }, preservesPartitioning = true)
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 0137f80953..51c05af064 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -17,8 +17,7 @@
package spark.rdd
-import spark.{Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
-import spark.SparkContext._
+import spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -30,15 +29,24 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* The resulting RDD from a shuffle (e.g. repartitioning of data).
* @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
- * @param serializerClass class name of the serializer to use.
* @tparam K the key class.
* @tparam V the value class.
*/
-class ShuffledRDD[K, V](
- @transient prev: RDD[(K, V)],
- part: Partitioner,
- serializerClass: String = null)
- extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part, serializerClass))) {
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+ @transient var prev: RDD[P],
+ part: Partitioner)
+ extends RDD[P](prev.context, Nil) {
+
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): ShuffledRDD[K, V, P] = {
+ serializerClass = cls
+ this
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ List(new ShuffleDependency(prev, part, serializerClass))
+ }
override val partitioner = Some(part)
@@ -46,9 +54,14 @@ class ShuffledRDD[K, V](
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
- override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
- SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
+ SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics,
SparkEnv.get.serializerManager.get(serializerClass))
}
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
}
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 0402b9f250..dadef5e17d 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -47,20 +47,26 @@ import spark.OneToOneDependency
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
- @transient var rdd1: RDD[(K, V)],
- @transient var rdd2: RDD[(K, W)],
- part: Partitioner,
- val serializerClass: String = null)
+ @transient var rdd1: RDD[_ <: Product2[K, V]],
+ @transient var rdd2: RDD[_ <: Product2[K, W]],
+ part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): SubtractedRDD[K, V, W] = {
+ serializerClass = cls
+ this
+ }
+
override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
if (rdd.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + rdd)
+ logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
- logInfo("Adding shuffle dependency with " + rdd)
- new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass)
+ logDebug("Adding shuffle dependency with " + rdd)
+ new ShuffleDependency(rdd, part, serializerClass)
}
}
}
@@ -97,16 +103,14 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
seq
}
}
- def integrate(dep: CoGroupSplitDep, op: ((K, V)) => Unit) = dep match {
+ def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit) = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
- for (t <- rdd.iterator(itsSplit, context))
- op(t.asInstanceOf[(K, V)])
+ rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
- val iter = SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index,
+ val iter = SparkEnv.get.shuffleFetcher.fetch[Product2[K, V]](shuffleId, partition.index,
context.taskMetrics, serializer)
- for (t <- iter)
- op(t.asInstanceOf[(K, V)])
+ iter.foreach(op)
}
}
// the first dep is rdd1; add all values to the map
diff --git a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
index 6a4fa13ad6..9a0831bd89 100644
--- a/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedPartitionsRDD.scala
@@ -55,27 +55,15 @@ abstract class ZippedPartitionsBaseRDD[V: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
- // Note that as number of rdd's increase and/or number of slaves in cluster increase, the computed preferredLocations below
- // become diminishingly small : so we might need to look at alternate strategies to alleviate this.
- // If there are no (or very small number of preferred locations), we will end up transferred the blocks to 'any' node in the
- // cluster - paying with n/w and cache cost.
- // Maybe pick a node which figures max amount of time ?
- // Choose node which is hosting 'larger' of some subset of blocks ?
- // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
- val splits = s.asInstanceOf[ZippedPartitionsPartition].partitions
- val rddSplitZip = rdds.zip(splits)
-
- // exact match.
- val exactMatchPreferredLocations = rddSplitZip.map(x => x._1.preferredLocations(x._2))
- val exactMatchLocations = exactMatchPreferredLocations.reduce((x, y) => x.intersect(y))
-
- // Remove exact match and then do host local match.
- val exactMatchHosts = exactMatchLocations.map(Utils.parseHostPort(_)._1)
- val matchPreferredHosts = exactMatchPreferredLocations.map(locs => locs.map(Utils.parseHostPort(_)._1))
- .reduce((x, y) => x.intersect(y))
- val otherNodeLocalLocations = matchPreferredHosts.filter { s => !exactMatchHosts.contains(s) }
-
- otherNodeLocalLocations ++ exactMatchLocations
+ val parts = s.asInstanceOf[ZippedPartitionsPartition].partitions
+ val prefs = rdds.zip(parts).map { case (rdd, p) => rdd.preferredLocations(p) }
+ // Check whether there are any hosts that match all RDDs; otherwise return the union
+ val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y))
+ if (!exactMatchLocations.isEmpty) {
+ exactMatchLocations
+ } else {
+ prefs.flatten.distinct
+ }
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index b1c43b3195..4074e50e44 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -65,27 +65,16 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def getPreferredLocations(s: Partition): Seq[String] = {
- // Note that as number of slaves in cluster increase, the computed preferredLocations can become small : so we might need
- // to look at alternate strategies to alleviate this. (If there are no (or very small number of preferred locations), we
- // will end up transferred the blocks to 'any' node in the cluster - paying with n/w and cache cost.
- // Maybe pick one or the other ? (so that atleast one block is local ?).
- // Choose node which is hosting 'larger' of the blocks ?
- // Look at rack locality to ensure chosen host is atleast rack local to both hosting node ?, etc (would be good to defer this if possible)
val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
val pref1 = rdd1.preferredLocations(partition1)
val pref2 = rdd2.preferredLocations(partition2)
-
- // exact match - instance local and host local.
+ // Check whether there are any hosts that match both RDDs; otherwise return the union
val exactMatchLocations = pref1.intersect(pref2)
-
- // remove locations which are already handled via exactMatchLocations, and intersect where both partitions are node local.
- val otherNodeLocalPref1 = pref1.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
- val otherNodeLocalPref2 = pref2.filter(loc => ! exactMatchLocations.contains(loc)).map(loc => Utils.parseHostPort(loc)._1)
- val otherNodeLocalLocations = otherNodeLocalPref1.intersect(otherNodeLocalPref2)
-
-
- // Can have mix of instance local (hostPort) and node local (host) locations as preference !
- exactMatchLocations ++ otherNodeLocalLocations
+ if (!exactMatchLocations.isEmpty) {
+ exactMatchLocations
+ } else {
+ (pref1 ++ pref2).distinct
+ }
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/scheduler/ActiveJob.scala b/core/src/main/scala/spark/scheduler/ActiveJob.scala
index 71cc94edb6..fecc3e9648 100644
--- a/core/src/main/scala/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/spark/scheduler/ActiveJob.scala
@@ -25,7 +25,7 @@ import java.util.Properties
* Tracks information about an active job in the DAGScheduler.
*/
private[spark] class ActiveJob(
- val runId: Int,
+ val jobId: Int,
val finalStage: Stage,
val func: (TaskContext, Iterator[_]) => _,
val partitions: Array[Int],
diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
index fbf3f4c807..9402f18a0f 100644
--- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala
@@ -32,10 +32,22 @@ import spark.storage.{BlockManager, BlockManagerMaster}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
/**
- * A Scheduler subclass that implements stage-oriented scheduling. It computes a DAG of stages for
- * each job, keeps track of which RDDs and stage outputs are materialized, and computes a minimal
- * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
- * and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
+ * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
+ * stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
+ * minimal schedule to run the job. It then submits stages as TaskSets to an underlying
+ * TaskScheduler implementation that runs them on the cluster.
+ *
+ * In addition to coming up with a DAG of stages, this class also determines the preferred
+ * locations to run each task on, based on the current cache status, and passes these to the
+ * low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
+ * lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
+ * not caused by shuffie file loss are handled by the TaskScheduler, which will retry each task
+ * a small number of times before cancelling the whole stage.
+ *
+ * THREADING: This class runs all its logic in a single thread executing the run() method, to which
+ * events are submitted using a synchonized queue (eventQueue). The public API methods, such as
+ * runJob, taskEnded and executorLost, post events asynchronously to this queue. All other methods
+ * should be private.
*/
private[spark]
class DAGScheduler(
@@ -72,8 +84,8 @@ class DAGScheduler(
}
// Called by TaskScheduler when a host is added
- override def executorGained(execId: String, hostPort: String) {
- eventQueue.put(ExecutorGained(execId, hostPort))
+ override def executorGained(execId: String, host: String) {
+ eventQueue.put(ExecutorGained(execId, host))
}
// Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
@@ -92,11 +104,11 @@ class DAGScheduler(
private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
- val nextRunId = new AtomicInteger(0)
+ val nextJobId = new AtomicInteger(0)
val nextStageId = new AtomicInteger(0)
- val idToStage = new TimeStampedHashMap[Int, Stage]
+ val stageIdToStage = new TimeStampedHashMap[Int, Stage]
val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
@@ -104,15 +116,16 @@ class DAGScheduler(
private val listenerBus = new SparkListenerBus()
- var cacheLocs = new HashMap[Int, Array[List[String]]]
+ // Contains the locations that each RDD's partitions are cached on
+ private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
- // For tracking failed nodes, we use the MapOutputTracker's generation number, which is
- // sent with every task. When we detect a node failing, we note the current generation number
- // and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask
- // results.
- // TODO: Garbage collect information about failure generations when we know there are no more
+ // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
+ // every task. When we detect a node failing, we note the current epoch number and failed
+ // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
+ //
+ // TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
- val failedGeneration = new HashMap[String, Long]
+ val failedEpoch = new HashMap[String, Long]
val idToActiveJob = new HashMap[Int, ActiveJob]
@@ -141,11 +154,13 @@ class DAGScheduler(
listenerBus.addListener(listener)
}
- private def getCacheLocs(rdd: RDD[_]): Array[List[String]] = {
+ private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
if (!cacheLocs.contains(rdd.id)) {
val blockIds = rdd.partitions.indices.map(index=> "rdd_%d_%d".format(rdd.id, index)).toArray
- val locs = BlockManager.blockIdsToExecutorLocations(blockIds, env, blockManagerMaster)
- cacheLocs(rdd.id) = blockIds.map(locs.getOrElse(_, Nil))
+ val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
+ cacheLocs(rdd.id) = blockIds.map { id =>
+ locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
+ }
}
cacheLocs(rdd.id)
}
@@ -156,14 +171,14 @@ class DAGScheduler(
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
- * The priority value passed in will be used if the stage doesn't already exist with
- * a lower priority (we assume that priorities always increase across jobs for now).
+ * The jobId value passed in will be used if the stage doesn't already exist with
+ * a lower jobId (jobId always increases across jobs.)
*/
- private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], priority: Int): Stage = {
+ private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
- val stage = newStage(shuffleDep.rdd, Some(shuffleDep), priority)
+ val stage = newStage(shuffleDep.rdd, Some(shuffleDep), jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
@@ -171,13 +186,13 @@ class DAGScheduler(
/**
* Create a Stage for the given RDD, either as a shuffle map stage (for a ShuffleDependency) or
- * as a result stage for the final RDD used directly in an action. The stage will also be given
- * the provided priority.
+ * as a result stage for the final RDD used directly in an action. The stage will also be
+ * associated with the provided jobId.
*/
private def newStage(
rdd: RDD[_],
shuffleDep: Option[ShuffleDependency[_,_]],
- priority: Int,
+ jobId: Int,
callSite: Option[String] = None)
: Stage =
{
@@ -188,17 +203,17 @@ class DAGScheduler(
mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size)
}
val id = nextStageId.getAndIncrement()
- val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, priority), priority, callSite)
- idToStage(id) = stage
+ val stage = new Stage(id, rdd, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
+ stageIdToStage(id) = stage
stageToInfos(stage) = StageInfo(stage)
stage
}
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
- * provided priority if they haven't already been created with a lower priority.
+ * provided jobId if they haven't already been created with a lower jobId.
*/
- private def getParentStages(rdd: RDD[_], priority: Int): List[Stage] = {
+ private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
@@ -209,7 +224,7 @@ class DAGScheduler(
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
- parents += getShuffleMapStage(shufDep, priority)
+ parents += getShuffleMapStage(shufDep, jobId)
case _ =>
visit(dep.rdd)
}
@@ -230,7 +245,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
- val mapStage = getShuffleMapStage(shufDep, stage.priority)
+ val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
@@ -267,7 +282,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val toSubmit = JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter,
properties)
- return (toSubmit, waiter)
+ (toSubmit, waiter)
}
def runJob[T, U: ClassManifest](
@@ -314,8 +329,8 @@ class DAGScheduler(
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
- eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener, properties))
- return listener.awaitResult() // Will throw an exception if the job fails
+ eventQueue.put(JobSubmitted(rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
+ listener.awaitResult() // Will throw an exception if the job fails
}
/**
@@ -325,11 +340,11 @@ class DAGScheduler(
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {
case JobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener, properties) =>
- val runId = nextRunId.getAndIncrement()
- val finalStage = newStage(finalRDD, None, runId, Some(callSite))
- val job = new ActiveJob(runId, finalStage, func, partitions, callSite, listener, properties)
+ val jobId = nextJobId.getAndIncrement()
+ val finalStage = newStage(finalRDD, None, jobId, Some(callSite))
+ val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
- logInfo("Got job " + job.runId + " (" + callSite + ") with " + partitions.length +
+ logInfo("Got job " + job.jobId + " (" + callSite + ") with " + partitions.length +
" output partitions (allowLocal=" + allowLocal + ")")
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
@@ -339,14 +354,14 @@ class DAGScheduler(
runLocally(job)
} else {
listenerBus.post(SparkListenerJobStart(job, properties))
- idToActiveJob(runId) = job
+ idToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
submitStage(finalStage)
}
- case ExecutorGained(execId, hostPort) =>
- handleExecutorGained(execId, hostPort)
+ case ExecutorGained(execId, host) =>
+ handleExecutorGained(execId, host)
case ExecutorLost(execId) =>
handleExecutorLost(execId)
@@ -360,7 +375,7 @@ class DAGScheduler(
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
- abortStage(idToStage(taskSet.stageId), reason)
+ abortStage(stageIdToStage(taskSet.stageId), reason)
case StopDAGScheduler =>
// Cancel any active jobs
@@ -371,7 +386,7 @@ class DAGScheduler(
}
return true
}
- return false
+ false
}
/**
@@ -383,7 +398,7 @@ class DAGScheduler(
clearCacheLocs()
val failed2 = failed.toArray
failed.clear()
- for (stage <- failed2.sortBy(_.priority)) {
+ for (stage <- failed2.sortBy(_.jobId)) {
submitStage(stage)
}
}
@@ -401,7 +416,7 @@ class DAGScheduler(
logTrace("failed: " + failed)
val waiting2 = waiting.toArray
waiting.clear()
- for (stage <- waiting2.sortBy(_.priority)) {
+ for (stage <- waiting2.sortBy(_.jobId)) {
submitStage(stage)
}
}
@@ -448,7 +463,7 @@ class DAGScheduler(
*/
protected def runLocally(job: ActiveJob) {
logInfo("Computing the requested partition locally")
- new Thread("Local computation of job " + job.runId) {
+ new Thread("Local computation of job " + job.jobId) {
override def run() {
runLocallyWithinThread(job)
}
@@ -508,7 +523,7 @@ class DAGScheduler(
} else {
// This is a final stage; figure out its job's missing partitions
val job = resultStageToJob(stage)
- for (id <- 0 until job.numPartitions if (!job.finished(id))) {
+ for (id <- 0 until job.numPartitions if !job.finished(id)) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
@@ -516,9 +531,9 @@ class DAGScheduler(
}
// must be run listener before possible NotSerializableException
// should be "StageSubmitted" first and then "JobEnded"
- val properties = idToActiveJob(stage.priority).properties
+ val properties = idToActiveJob(stage.jobId).properties
listenerBus.post(SparkListenerStageSubmitted(stage, tasks.size, properties))
-
+
if (tasks.size > 0) {
// Preemptively serialize a task to make sure it can be serialized. We are catching this
// exception here because it would be fairly hard to catch the non-serializable exception
@@ -537,7 +552,7 @@ class DAGScheduler(
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
taskSched.submitTasks(
- new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority, properties))
+ new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
if (!stage.submissionTime.isDefined) {
stage.submissionTime = Some(System.currentTimeMillis())
}
@@ -554,7 +569,7 @@ class DAGScheduler(
*/
private def handleTaskCompletion(event: CompletionEvent) {
val task = event.task
- val stage = idToStage(task.stageId)
+ val stage = stageIdToStage(task.stageId)
def markStageAsFinished(stage: Stage) = {
val serviceTime = stage.submissionTime match {
@@ -583,7 +598,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- idToActiveJob -= stage.priority
+ idToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
@@ -599,7 +614,7 @@ class DAGScheduler(
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
- if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) {
+ if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partition, status)
@@ -611,16 +626,16 @@ class DAGScheduler(
logInfo("waiting: " + waiting)
logInfo("failed: " + failed)
if (stage.shuffleDep != None) {
- // We supply true to increment the generation number here in case this is a
+ // We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
// locations with holes (from when we detected the error) and will need the
- // generation incremented to refetch them.
- // TODO: Only increment the generation number if this is not the first time
+ // epoch incremented to refetch them.
+ // TODO: Only increment the epoch number if this is not the first time
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
stage.shuffleDep.get.shuffleId,
stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
- true)
+ changeEpoch = true)
}
clearCacheLocs()
if (stage.outputLocs.count(_ == Nil) != 0) {
@@ -654,7 +669,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
- val failedStage = idToStage(task.stageId)
+ val failedStage = stageIdToStage(task.stageId)
running -= failedStage
failed += failedStage
// TODO: Cancel running tasks in the stage
@@ -674,7 +689,7 @@ class DAGScheduler(
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
- handleExecutorLost(bmAddress.executorId, Some(task.generation))
+ handleExecutorLost(bmAddress.executorId, Some(task.epoch))
}
case ExceptionFailure(className, description, stackTrace, metrics) =>
@@ -682,7 +697,7 @@ class DAGScheduler(
case other =>
// Unrecognized failure - abort all jobs depending on this stage
- abortStage(idToStage(task.stageId), task + " failed: " + other)
+ abortStage(stageIdToStage(task.stageId), task + " failed: " + other)
}
}
@@ -690,36 +705,36 @@ class DAGScheduler(
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
- * Optionally the generation during which the failure was caught can be passed to avoid allowing
+ * Optionally the epoch during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
- private def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
- val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
- if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
- failedGeneration(execId) = currentGeneration
- logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration))
+ private def handleExecutorLost(execId: String, maybeEpoch: Option[Long] = None) {
+ val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
+ if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
+ failedEpoch(execId) = currentEpoch
+ logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
blockManagerMaster.removeExecutor(execId)
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
- mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
+ mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {
- mapOutputTracker.incrementGeneration()
+ mapOutputTracker.incrementEpoch()
}
clearCacheLocs()
} else {
logDebug("Additional executor lost message for " + execId +
- "(generation " + currentGeneration + ")")
+ "(epoch " + currentEpoch + ")")
}
}
- private def handleExecutorGained(execId: String, hostPort: String) {
- // remove from failedGeneration(execId) ?
- if (failedGeneration.contains(execId)) {
- logInfo("Host gained which was in lost list earlier: " + hostPort)
- failedGeneration -= execId
+ private def handleExecutorGained(execId: String, host: String) {
+ // remove from failedEpoch(execId) ?
+ if (failedEpoch.contains(execId)) {
+ logInfo("Host gained which was in lost list earlier: " + host)
+ failedEpoch -= execId
}
}
@@ -735,7 +750,7 @@ class DAGScheduler(
val error = new SparkException("Job failed: " + reason)
job.listener.jobFailed(error)
listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
- idToActiveJob -= resultStage.priority
+ idToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
}
@@ -759,7 +774,7 @@ class DAGScheduler(
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
- val mapStage = getShuffleMapStage(shufDep, stage.priority)
+ val mapStage = getShuffleMapStage(shufDep, stage.jobId)
if (!mapStage.isAvailable) {
visitedStages += mapStage
visit(mapStage.rdd)
@@ -774,16 +789,16 @@ class DAGScheduler(
visitedRdds.contains(target.rdd)
}
- private def getPreferredLocs(rdd: RDD[_], partition: Int): List[String] = {
+ private def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
- if (cached != Nil) {
+ if (!cached.isEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
- if (rddPrefs != Nil) {
- return rddPrefs
+ if (!rddPrefs.isEmpty) {
+ return rddPrefs.map(host => TaskLocation(host))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dep
// that has any placement preferences. Ideally we would choose based on transfer sizes,
@@ -797,13 +812,13 @@ class DAGScheduler(
}
case _ =>
})
- return Nil
+ Nil
}
private def cleanup(cleanupTime: Long) {
- var sizeBefore = idToStage.size
- idToStage.clearOldValues(cleanupTime)
- logInfo("idToStage " + sizeBefore + " --> " + idToStage.size)
+ var sizeBefore = stageIdToStage.size
+ stageIdToStage.clearOldValues(cleanupTime)
+ logInfo("stageIdToStage " + sizeBefore + " --> " + stageIdToStage.size)
sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
index 3b4ee6287a..b8ba0e9239 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerEvent.scala
@@ -54,9 +54,7 @@ private[spark] case class CompletionEvent(
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent
-private[spark] case class ExecutorGained(execId: String, hostPort: String) extends DAGSchedulerEvent {
- Utils.checkHostPort(hostPort, "Required hostport")
-}
+private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
diff --git a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
index 87d27cc70d..98c4fb7e59 100644
--- a/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/spark/scheduler/DAGSchedulerSource.scala
@@ -21,7 +21,7 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends
})
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.nextRunId.get()
+ override def getValue: Int = dagScheduler.nextJobId.get()
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
index 7194fcaa49..1bc9fabdff 100644
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/spark/scheduler/JobLogger.scala
@@ -102,7 +102,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
protected def buildJobDep(jobID: Int, stage: Stage) {
- if (stage.priority == jobID) {
+ if (stage.jobId == jobID) {
jobIDToStages.get(jobID) match {
case Some(stageList) => stageList += stage
case None => val stageList = new ListBuffer[Stage]
@@ -178,12 +178,12 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
}else{
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
}
- if (stage.priority == jobID) {
+ if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
} else
- jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.priority, false)
+ jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
}
// Record task metrics into job log files
@@ -260,7 +260,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
val job = jobEnd.job
- var info = "JOB_ID=" + job.runId
+ var info = "JOB_ID=" + job.jobId
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
@@ -268,8 +268,8 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
}
- jobLogInfo(job.runId, info.substring(0, info.length - 1).toUpperCase)
- closeLogWriter(job.runId)
+ jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
+ closeLogWriter(job.jobId)
}
protected def recordJobProperties(jobID: Int, properties: Properties) {
@@ -282,11 +282,11 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
override def onJobStart(jobStart: SparkListenerJobStart) {
val job = jobStart.job
val properties = jobStart.properties
- createLogWriter(job.runId)
- recordJobProperties(job.runId, properties)
- buildJobDep(job.runId, job.finalStage)
- recordStageDep(job.runId)
- recordStageDepGraph(job.runId, job.finalStage)
- jobLogInfo(job.runId, "JOB_ID=" + job.runId + " STATUS=STARTED")
+ createLogWriter(job.jobId)
+ recordJobProperties(job.jobId, properties)
+ buildJobDep(job.jobId, job.finalStage)
+ recordStageDep(job.jobId)
+ recordStageDepGraph(job.jobId, job.finalStage)
+ jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
}
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 832ca18b8c..d066df5dc1 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -73,7 +73,7 @@ private[spark] class ResultTask[T, U](
var rdd: RDD[T],
var func: (TaskContext, Iterator[T]) => U,
var partition: Int,
- @transient locs: Seq[String],
+ @transient locs: Seq[TaskLocation],
val outputId: Int)
extends Task[U](stageId) with Externalizable {
@@ -85,11 +85,8 @@ private[spark] class ResultTask[T, U](
rdd.partitions(partition)
}
- private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
-
- {
- // DEBUG code
- preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
+ @transient private val preferredLocs: Seq[TaskLocation] = {
+ if (locs == null) Nil else locs.toSet.toSeq
}
override def run(attemptId: Long): U = {
@@ -102,7 +99,7 @@ private[spark] class ResultTask[T, U](
}
}
- override def preferredLocations: Seq[String] = preferredLocs
+ override def preferredLocations: Seq[TaskLocation] = preferredLocs
override def toString = "ResultTask(" + stageId + ", " + partition + ")"
@@ -116,7 +113,7 @@ private[spark] class ResultTask[T, U](
out.write(bytes)
out.writeInt(partition)
out.writeInt(outputId)
- out.writeLong(generation)
+ out.writeLong(epoch)
out.writeObject(split)
}
}
@@ -131,7 +128,7 @@ private[spark] class ResultTask[T, U](
func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
partition = in.readInt()
val outputId = in.readInt()
- generation = in.readLong()
+ epoch = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index e3bb6d1e60..f2a038576b 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -88,18 +88,15 @@ private[spark] class ShuffleMapTask(
var rdd: RDD[_],
var dep: ShuffleDependency[_,_],
var partition: Int,
- @transient private var locs: Seq[String])
+ @transient private var locs: Seq[TaskLocation])
extends Task[MapStatus](stageId)
with Externalizable
with Logging {
protected def this() = this(0, null, null, 0, null)
- @transient private val preferredLocs: Seq[String] = if (locs == null) Nil else locs.toSet.toSeq
-
- {
- // DEBUG code
- preferredLocs.foreach (hostPort => Utils.checkHost(Utils.parseHostPort(hostPort)._1, "preferredLocs : " + preferredLocs))
+ @transient private val preferredLocs: Seq[TaskLocation] = {
+ if (locs == null) Nil else locs.toSet.toSeq
}
var split = if (rdd == null) null else rdd.partitions(partition)
@@ -112,7 +109,7 @@ private[spark] class ShuffleMapTask(
out.writeInt(bytes.length)
out.write(bytes)
out.writeInt(partition)
- out.writeLong(generation)
+ out.writeLong(epoch)
out.writeObject(split)
}
}
@@ -126,7 +123,7 @@ private[spark] class ShuffleMapTask(
rdd = rdd_
dep = dep_
partition = in.readInt()
- generation = in.readLong()
+ epoch = in.readLong()
split = in.readObject().asInstanceOf[Partition]
}
@@ -148,7 +145,7 @@ private[spark] class ShuffleMapTask(
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, taskContext)) {
- val pair = elem.asInstanceOf[(Any, Any)]
+ val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
}
@@ -186,7 +183,7 @@ private[spark] class ShuffleMapTask(
}
}
- override def preferredLocations: Seq[String] = preferredLocs
+ override def preferredLocations: Seq[TaskLocation] = preferredLocs
override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
}
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
index 2a09a956ad..e5531011c2 100644
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/spark/scheduler/SparkListener.scala
@@ -153,7 +153,7 @@ object StatsReportListener extends Logging {
}
def showBytesDistribution(heading: String, dist: Distribution) {
- showDistribution(heading, dist, (d => Utils.memoryBytesToString(d.toLong)): Double => String)
+ showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
}
def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
index 5428daeb94..c599c00ac4 100644
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/spark/scheduler/Stage.scala
@@ -33,15 +33,16 @@ import spark.storage.BlockManagerId
* initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
* that each output partition is on.
*
- * Each Stage also has a priority, which is (by default) based on the job it was submitted in.
- * This allows Stages from earlier jobs to be computed first or recovered faster on failure.
+ * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
+ * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
+ * faster on failure.
*/
private[spark] class Stage(
val id: Int,
val rdd: RDD[_],
val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
val parents: List[Stage],
- val priority: Int,
+ val jobId: Int,
callSite: Option[String])
extends Logging {
diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala
index 50768d43e0..0ab2ae6cfe 100644
--- a/core/src/main/scala/spark/scheduler/Task.scala
+++ b/core/src/main/scala/spark/scheduler/Task.scala
@@ -30,9 +30,9 @@ import spark.executor.TaskMetrics
*/
private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
def run(attemptId: Long): T
- def preferredLocations: Seq[String] = Nil
+ def preferredLocations: Seq[TaskLocation] = Nil
- var generation: Long = -1 // Map output tracker generation. Will be set by TaskScheduler.
+ var epoch: Long = -1 // Map output tracker epoch. Will be set by TaskScheduler.
var metrics: Option[TaskMetrics] = None
diff --git a/core/src/main/scala/spark/scheduler/TaskLocation.scala b/core/src/main/scala/spark/scheduler/TaskLocation.scala
new file mode 100644
index 0000000000..fea117e956
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/TaskLocation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler
+
+/**
+ * A location where a task should run. This can either be a host or a (host, executorID) pair.
+ * In the latter case, we will prefer to launch the task on that executorID, but our next level
+ * of preference will be executors on the same host if this is not possible.
+ */
+private[spark]
+class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
+ override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
+}
+
+private[spark] object TaskLocation {
+ def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
+
+ def apply(host: String) = new TaskLocation(host, None)
+}
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
index 89793e0e82..fc4856756b 100644
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/spark/scheduler/TaskResult.scala
@@ -28,7 +28,9 @@ import java.nio.ByteBuffer
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
private[spark]
-class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics) extends Externalizable {
+class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+ extends Externalizable
+{
def this() = this(null.asInstanceOf[T], null, null)
override def writeExternal(out: ObjectOutput) {
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
index 2cdeb1c8c0..64be50b2d0 100644
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
@@ -35,7 +35,7 @@ private[spark] trait TaskSchedulerListener {
taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
// A node was added to the cluster.
- def executorGained(execId: String, hostPort: String): Unit
+ def executorGained(execId: String, host: String): Unit
// A node was lost from the cluster.
def executorLost(execId: String): Unit
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 96568e0d27..679d899b47 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -33,45 +33,27 @@ import java.util.{TimerTask, Timer}
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
- * start(), then submit task sets through the runTasks method.
+ * initialize() and start(), then submit task sets through the runTasks method.
+ *
+ * This class can work with multiple types of clusters by acting through a SchedulerBackend.
+ * It handles common logic, like determining a scheduling order across jobs, waking up to launch
+ * speculative tasks, etc.
+ *
+ * THREADING: SchedulerBackends and task-submitting clients can call this class from multiple
+ * threads, so it needs locks in public API methods to maintain its state. In addition, some
+ * SchedulerBackends sycnchronize on themselves when they want to send events here, and then
+ * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
+ * we are holding a lock on ourselves.
*/
private[spark] class ClusterScheduler(val sc: SparkContext)
extends TaskScheduler
- with Logging {
-
+ with Logging
+{
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
- // How often to revive offers in case there are pending tasks - that is how often to try to get
- // tasks scheduled in case there are nodes available : default 0 is to disable it - to preserve existing behavior
- // Note that this is required due to delayed scheduling due to data locality waits, etc.
- // TODO: rename property ?
- val TASK_REVIVAL_INTERVAL = System.getProperty("spark.tasks.revive.interval", "0").toLong
-
- /*
- This property controls how aggressive we should be to modulate waiting for node local task scheduling.
- To elaborate, currently there is a time limit (3 sec def) to ensure that spark attempts to wait for node locality of tasks before
- scheduling on other nodes. We have modified this in yarn branch such that offers to task set happen in prioritized order :
- node-local, rack-local and then others
- But once all available node local (and no pref) tasks are scheduled, instead of waiting for 3 sec before
- scheduling to other nodes (which degrades performance for time sensitive tasks and on larger clusters), we can
- modulate that : to also allow rack local nodes or any node. The default is still set to HOST - so that previous behavior is
- maintained. This is to allow tuning the tension between pulling rdd data off node and scheduling computation asap.
-
- TODO: rename property ? The value is one of
- - NODE_LOCAL (default, no change w.r.t current behavior),
- - RACK_LOCAL and
- - ANY
-
- Note that this property makes more sense when used in conjugation with spark.tasks.revive.interval > 0 : else it is not very effective.
-
- Additional Note: For non trivial clusters, there is a 4x - 5x reduction in running time (in some of our experiments) based on whether
- it is left at default NODE_LOCAL, RACK_LOCAL (if cluster is configured to be rack aware) or ANY.
- If cluster is rack aware, then setting it to RACK_LOCAL gives best tradeoff and a 3x - 4x performance improvement while minimizing IO impact.
- Also, it brings down the variance in running time drastically.
- */
- val TASK_SCHEDULING_AGGRESSION = TaskLocality.parse(System.getProperty("spark.tasks.schedule.aggression", "NODE_LOCAL"))
val activeTaskSets = new HashMap[String, TaskSetManager]
@@ -89,16 +71,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Which executor IDs we have executors on
val activeExecutorIds = new HashSet[String]
- // TODO: We might want to remove this and merge it with execId datastructures - but later.
- // Which hosts in the cluster are alive (contains hostPort's) - used for process local and node local task locality.
- private val hostPortsAlive = new HashSet[String]
- private val hostToAliveHostPorts = new HashMap[String, HashSet[String]]
-
// The set of executors we have on each host; this is used to compute hostsAlive, which
// in turn is used to decide when we can attain data locality on a given host
- private val executorsByHostPort = new HashMap[String, HashSet[String]]
+ private val executorsByHost = new HashMap[String, HashSet[String]]
- private val executorIdToHostPort = new HashMap[String, String]
+ private val executorIdToHost = new HashMap[String, String]
// JAR server, if any JARs were added by the user to the SparkContext
var jarServer: HttpServer = null
@@ -136,23 +113,14 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
schedulableBuilder.buildPools()
- // resolve executorId to hostPort mapping.
- def executorToHostPort(executorId: String, defaultHostPort: String): String = {
- executorIdToHostPort.getOrElse(executorId, defaultHostPort)
- }
-
- // Unfortunately, this means that SparkEnv is indirectly referencing ClusterScheduler
- // Will that be a design violation ?
- SparkEnv.get.executorIdToHostPort = Some(executorToHostPort)
}
-
def newTaskId(): Long = nextTaskId.getAndIncrement()
override def start() {
backend.start()
- if (JBoolean.getBoolean("spark.speculation")) {
+ if (System.getProperty("spark.speculation", "false").toBoolean) {
new Thread("ClusterScheduler speculation check") {
setDaemon(true)
@@ -169,27 +137,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}.start()
}
-
-
- // Change to always run with some default if TASK_REVIVAL_INTERVAL <= 0 ?
- if (TASK_REVIVAL_INTERVAL > 0) {
- new Thread("ClusterScheduler task offer revival check") {
- setDaemon(true)
-
- override def run() {
- logInfo("Starting speculative task offer revival thread")
- while (true) {
- try {
- Thread.sleep(TASK_REVIVAL_INTERVAL)
- } catch {
- case e: InterruptedException => {}
- }
-
- if (hasPendingTasks()) backend.reviveOffers()
- }
- }
- }.start()
- }
}
override def submitTasks(taskSet: TaskSet) {
@@ -201,7 +148,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
- if (hasReceivedTask == false) {
+ if (!hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
@@ -214,7 +161,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
- hasReceivedTask = true;
+ hasReceivedTask = true
}
backend.reviveOffers()
}
@@ -235,172 +182,55 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
- def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = {
- synchronized {
- SparkEnv.set(sc.env)
- // Mark each slave as alive and remember its hostname
- for (o <- offers) {
- // DEBUG Code
- Utils.checkHostPort(o.hostPort)
-
- executorIdToHostPort(o.executorId) = o.hostPort
- if (! executorsByHostPort.contains(o.hostPort)) {
- executorsByHostPort(o.hostPort) = new HashSet[String]()
- }
-
- hostPortsAlive += o.hostPort
- hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(o.hostPort)._1, new HashSet[String]).add(o.hostPort)
- executorGained(o.executorId, o.hostPort)
+ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
+ SparkEnv.set(sc.env)
+
+ // Mark each slave as alive and remember its hostname
+ for (o <- offers) {
+ executorIdToHost(o.executorId) = o.host
+ if (!executorsByHost.contains(o.host)) {
+ executorsByHost(o.host) = new HashSet[String]()
+ executorGained(o.executorId, o.host)
}
- // Build a list of tasks to assign to each slave
- val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
- // merge availableCpus into nodeToAvailableCpus block ?
- val availableCpus = offers.map(o => o.cores).toArray
- val nodeToAvailableCpus = {
- val map = new HashMap[String, Int]()
- for (offer <- offers) {
- val hostPort = offer.hostPort
- val cores = offer.cores
- // DEBUG code
- Utils.checkHostPort(hostPort)
-
- val host = Utils.parseHostPort(hostPort)._1
-
- map.put(host, map.getOrElse(host, 0) + cores)
- }
-
- map
- }
- var launchedTask = false
- val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-
- for (manager <- sortedTaskSetQueue) {
- logDebug("parentName:%s, name:%s, runningTasks:%s".format(
- manager.parent.name, manager.name, manager.runningTasks))
- }
-
- for (manager <- sortedTaskSetQueue) {
+ }
- // Split offers based on node local, rack local and off-rack tasks.
- val processLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
- val nodeLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
- val rackLocalOffers = new HashMap[String, ArrayBuffer[Int]]()
- val otherOffers = new HashMap[String, ArrayBuffer[Int]]()
+ // Build a list of tasks to assign to each worker
+ val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ val availableCpus = offers.map(o => o.cores).toArray
+ val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+ for (taskSet <- sortedTaskSets) {
+ logDebug("parentName: %s, name: %s, runningTasks: %s".format(
+ taskSet.parent.name, taskSet.name, taskSet.runningTasks))
+ }
+ // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+ // of locality levels so that it gets a chance to launch local tasks on all of them.
+ var launchedTask = false
+ for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
+ do {
+ launchedTask = false
for (i <- 0 until offers.size) {
- val hostPort = offers(i).hostPort
- // DEBUG code
- Utils.checkHostPort(hostPort)
-
- val numProcessLocalTasks = math.max(0, math.min(manager.numPendingTasksForHostPort(hostPort), availableCpus(i)))
- if (numProcessLocalTasks > 0){
- val list = processLocalOffers.getOrElseUpdate(hostPort, new ArrayBuffer[Int])
- for (j <- 0 until numProcessLocalTasks) list += i
- }
-
- val host = Utils.parseHostPort(hostPort)._1
- val numNodeLocalTasks = math.max(0,
- // Remove process local tasks (which are also host local btw !) from this
- math.min(manager.numPendingTasksForHost(hostPort) - numProcessLocalTasks, nodeToAvailableCpus(host)))
- if (numNodeLocalTasks > 0){
- val list = nodeLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
- for (j <- 0 until numNodeLocalTasks) list += i
- }
-
- val numRackLocalTasks = math.max(0,
- // Remove node local tasks (which are also rack local btw !) from this
- math.min(manager.numRackLocalPendingTasksForHost(hostPort) - numProcessLocalTasks - numNodeLocalTasks, nodeToAvailableCpus(host)))
- if (numRackLocalTasks > 0){
- val list = rackLocalOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
- for (j <- 0 until numRackLocalTasks) list += i
- }
- if (numNodeLocalTasks <= 0 && numRackLocalTasks <= 0){
- // add to others list - spread even this across cluster.
- val list = otherOffers.getOrElseUpdate(host, new ArrayBuffer[Int])
- list += i
+ val execId = offers(i).executorId
+ val host = offers(i).host
+ for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
+ tasks(i) += task
+ val tid = task.taskId
+ taskIdToTaskSetId(tid) = taskSet.taskSet.id
+ taskSetTaskIds(taskSet.taskSet.id) += tid
+ taskIdToExecutorId(tid) = execId
+ activeExecutorIds += execId
+ executorsByHost(host) += execId
+ availableCpus(i) -= 1
+ launchedTask = true
}
}
+ } while (launchedTask)
+ }
- val offersPriorityList = new ArrayBuffer[Int](
- processLocalOffers.size + nodeLocalOffers.size + rackLocalOffers.size + otherOffers.size)
-
- // First process local, then host local, then rack, then others
-
- // numNodeLocalOffers contains count of both process local and host offers.
- val numNodeLocalOffers = {
- val processLocalPriorityList = ClusterScheduler.prioritizeContainers(processLocalOffers)
- offersPriorityList ++= processLocalPriorityList
-
- val nodeLocalPriorityList = ClusterScheduler.prioritizeContainers(nodeLocalOffers)
- offersPriorityList ++= nodeLocalPriorityList
-
- processLocalPriorityList.size + nodeLocalPriorityList.size
- }
- val numRackLocalOffers = {
- val rackLocalPriorityList = ClusterScheduler.prioritizeContainers(rackLocalOffers)
- offersPriorityList ++= rackLocalPriorityList
- rackLocalPriorityList.size
- }
- offersPriorityList ++= ClusterScheduler.prioritizeContainers(otherOffers)
-
- var lastLoop = false
- val lastLoopIndex = TASK_SCHEDULING_AGGRESSION match {
- case TaskLocality.NODE_LOCAL => numNodeLocalOffers
- case TaskLocality.RACK_LOCAL => numRackLocalOffers + numNodeLocalOffers
- case TaskLocality.ANY => offersPriorityList.size
- }
-
- do {
- launchedTask = false
- var loopCount = 0
- for (i <- offersPriorityList) {
- val execId = offers(i).executorId
- val hostPort = offers(i).hostPort
-
- // If last loop and within the lastLoopIndex, expand scope - else use null (which will use default/existing)
- val overrideLocality = if (lastLoop && loopCount < lastLoopIndex) TASK_SCHEDULING_AGGRESSION else null
-
- // If last loop, override waiting for host locality - we scheduled all local tasks already and there might be more available ...
- loopCount += 1
-
- manager.slaveOffer(execId, hostPort, availableCpus(i), overrideLocality) match {
- case Some(task) =>
- tasks(i) += task
- val tid = task.taskId
- taskIdToTaskSetId(tid) = manager.taskSet.id
- taskSetTaskIds(manager.taskSet.id) += tid
- taskIdToExecutorId(tid) = execId
- activeExecutorIds += execId
- executorsByHostPort(hostPort) += execId
- availableCpus(i) -= 1
- launchedTask = true
-
- case None => {}
- }
- }
- // Loop once more - when lastLoop = true, then we try to schedule task on all nodes irrespective of
- // data locality (we still go in order of priority : but that would not change anything since
- // if data local tasks had been available, we would have scheduled them already)
- if (lastLoop) {
- // prevent more looping
- launchedTask = false
- } else if (!lastLoop && !launchedTask) {
- // Do this only if TASK_SCHEDULING_AGGRESSION != NODE_LOCAL
- if (TASK_SCHEDULING_AGGRESSION != TaskLocality.NODE_LOCAL) {
- // fudge launchedTask to ensure we loop once more
- launchedTask = true
- // dont loop anymore
- lastLoop = true
- }
- }
- } while (launchedTask)
- }
-
- if (tasks.size > 0) {
- hasLaunchedTask = true
- }
- return tasks
+ if (tasks.size > 0) {
+ hasLaunchedTask = true
}
+ return tasks
}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
@@ -448,7 +278,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
backend.reviveOffers()
}
if (taskFailed) {
-
// Also revive offers if a task had failed for some reason other than host lost
backend.reviveOffers()
}
@@ -503,7 +332,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
// Check for pending tasks in all our active jobs.
- def hasPendingTasks(): Boolean = {
+ def hasPendingTasks: Boolean = {
synchronized {
rootPool.hasPendingTasks()
}
@@ -514,7 +343,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
synchronized {
if (activeExecutorIds.contains(executorId)) {
- val hostPort = executorIdToHostPort(executorId)
+ val hostPort = executorIdToHost(executorId)
logError("Lost executor %s on %s: %s".format(executorId, hostPort, reason))
removeExecutor(executorId)
failedExecutor = Some(executorId)
@@ -536,88 +365,63 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
/** Remove an executor from all our data structures and mark it as lost */
private def removeExecutor(executorId: String) {
activeExecutorIds -= executorId
- val hostPort = executorIdToHostPort(executorId)
- if (hostPortsAlive.contains(hostPort)) {
- // DEBUG Code
- Utils.checkHostPort(hostPort)
-
- hostPortsAlive -= hostPort
- hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort)
- }
-
- val execs = executorsByHostPort.getOrElse(hostPort, new HashSet)
+ val host = executorIdToHost(executorId)
+ val execs = executorsByHost.getOrElse(host, new HashSet)
execs -= executorId
if (execs.isEmpty) {
- executorsByHostPort -= hostPort
+ executorsByHost -= host
}
- executorIdToHostPort -= executorId
- rootPool.executorLost(executorId, hostPort)
+ executorIdToHost -= executorId
+ rootPool.executorLost(executorId, host)
}
- def executorGained(execId: String, hostPort: String) {
- listener.executorGained(execId, hostPort)
+ def executorGained(execId: String, host: String) {
+ listener.executorGained(execId, host)
}
- def getExecutorsAliveOnHost(host: String): Option[Set[String]] = {
- Utils.checkHost(host)
-
- val retval = hostToAliveHostPorts.get(host)
- if (retval.isDefined) {
- return Some(retval.get.toSet)
- }
+ def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
+ executorsByHost.get(host).map(_.toSet)
+ }
- None
+ def hasExecutorsAliveOnHost(host: String): Boolean = synchronized {
+ executorsByHost.contains(host)
}
- def isExecutorAliveOnHostPort(hostPort: String): Boolean = {
- // Even if hostPort is a host, it does not matter - it is just a specific check.
- // But we do have to ensure that only hostPort get into hostPortsAlive !
- // So no check against Utils.checkHostPort
- hostPortsAlive.contains(hostPort)
+ def isExecutorAlive(execId: String): Boolean = synchronized {
+ activeExecutorIds.contains(execId)
}
// By default, rack is unknown
def getRackForHost(value: String): Option[String] = None
-
- // By default, (cached) hosts for rack is unknown
- def getCachedHostsForRack(rack: String): Option[Set[String]] = None
}
object ClusterScheduler {
-
- // Used to 'spray' available containers across the available set to ensure too many containers on same host
- // are not used up. Used in yarn mode and in task scheduling (when there are multiple containers available
- // to execute a task)
- // For example: yarn can returns more containers than we would have requested under ANY, this method
- // prioritizes how to use the allocated containers.
- // flatten the map such that the array buffer entries are spread out across the returned value.
- // given <host, list[container]> == <h1, [c1 .. c5]>, <h2, [c1 .. c3]>, <h3, [c1, c2]>, <h4, c1>, <h5, c1>, i
- // the return value would be something like : h1c1, h2c1, h3c1, h4c1, h5c1, h1c2, h2c2, h3c2, h1c3, h2c3, h1c4, h1c5
- // We then 'use' the containers in this order (consuming only the top K from this list where
- // K = number to be user). This is to ensure that if we have multiple eligible allocations,
- // they dont end up allocating all containers on a small number of hosts - increasing probability of
- // multiple container failure when a host goes down.
- // Note, there is bias for keys with higher number of entries in value to be picked first (by design)
- // Also note that invocation of this method is expected to have containers of same 'type'
- // (host-local, rack-local, off-rack) and not across types : so that reordering is simply better from
- // the available list - everything else being same.
- // That is, we we first consume data local, then rack local and finally off rack nodes. So the
- // prioritization from this method applies to within each category
+ /**
+ * Used to balance containers across hosts.
+ *
+ * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+ * resource offers representing the order in which the offers should be used. The resource
+ * offers are ordered such that we'll allocate one container on each host before allocating a
+ * second container on any host, and so on, in order to reduce the damage if a host fails.
+ *
+ * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+ * [o1, o5, o4, 02, o6, o3]
+ */
def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
val _keyList = new ArrayBuffer[K](map.size)
_keyList ++= map.keys
// order keyList based on population of value in map
val keyList = _keyList.sortWith(
- (left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
+ (left, right) => map(left).size > map(right).size
)
val retval = new ArrayBuffer[T](keyList.size * 2)
var index = 0
var found = true
- while (found){
+ while (found) {
found = false
for (key <- keyList) {
val containerList: ArrayBuffer[T] = map.get(key).getOrElse(null)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 7f855cd345..a4d6880abb 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -29,60 +29,33 @@ import scala.math.min
import spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState, Utils}
import spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
import spark.TaskState.TaskState
-import spark.scheduler.{ShuffleMapTask, Task, TaskResult, TaskSet}
+import spark.scheduler._
+import scala.Some
+import spark.FetchFailed
+import spark.ExceptionFailure
+import spark.TaskResultTooBigFailure
+import spark.util.{SystemClock, Clock}
-private[spark] object TaskLocality
- extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY") with Logging {
-
- // process local is expected to be used ONLY within tasksetmanager for now.
- val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
-
- type TaskLocality = Value
-
- def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
-
- // Must not be the constraint.
- assert (constraint != TaskLocality.PROCESS_LOCAL)
-
- constraint match {
- case TaskLocality.NODE_LOCAL =>
- condition == TaskLocality.NODE_LOCAL
- case TaskLocality.RACK_LOCAL =>
- condition == TaskLocality.NODE_LOCAL || condition == TaskLocality.RACK_LOCAL
- // For anything else, allow
- case _ => true
- }
- }
-
- def parse(str: String): TaskLocality = {
- // better way to do this ?
- try {
- val retval = TaskLocality.withName(str)
- // Must not specify PROCESS_LOCAL !
- assert (retval != TaskLocality.PROCESS_LOCAL)
- retval
- } catch {
- case nEx: NoSuchElementException => {
- logWarning("Invalid task locality specified '" + str + "', defaulting to NODE_LOCAL")
- // default to preserve earlier behavior
- NODE_LOCAL
- }
- }
- }
-}
-
/**
- * Schedules the tasks within a single TaskSet in the ClusterScheduler.
+ * Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
+ * the status of each task, retries tasks if they fail (up to a limited number of times), and
+ * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
+ * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
+ * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the
+ * ClusterScheduler (e.g. its event handlers). It should not be called from other threads.
*/
-private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet: TaskSet)
- extends TaskSetManager with Logging {
-
- // Maximum time to wait to run a task in a preferred location (in ms)
- val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
-
+private[spark] class ClusterTaskSetManager(
+ sched: ClusterScheduler,
+ val taskSet: TaskSet,
+ clock: Clock = SystemClock)
+ extends TaskSetManager
+ with Logging
+{
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+ val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@@ -110,31 +83,27 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
var stageId = taskSet.stageId
var name = "TaskSet_"+taskSet.stageId.toString
var parent: Schedulable = null
- // Last time when we launched a preferred task (for delay scheduling)
- var lastPreferredLaunchTime = System.currentTimeMillis
- // List of pending tasks for each node (process local to container).
- // These collections are actually
+ // Set of pending tasks for each executor. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. They are also only cleaned up lazily;
// when a task is launched, it remains in all the pending lists except
// the one that it was launched from, but gets removed from them later.
- private val pendingTasksForHostPort = new HashMap[String, ArrayBuffer[Int]]
+ private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
- // List of pending tasks for each node.
- // Essentially, similar to pendingTasksForHostPort, except at host level
+ // Set of pending tasks for each host. Similar to pendingTasksForExecutor,
+ // but at host level.
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
- // List of pending tasks for each node based on rack locality.
- // Essentially, similar to pendingTasksForHost, except at rack level
- private val pendingRackLocalTasksForHost = new HashMap[String, ArrayBuffer[Int]]
+ // Set of pending tasks for each rack -- similar to the above.
+ private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
- // List containing pending tasks with no locality preferences
+ // Set containing pending tasks with no locality preferences.
val pendingTasksWithNoPrefs = new ArrayBuffer[Int]
- // List containing all pending tasks (also used as a stack, as above)
+ // Set containing all pending tasks (also used as a stack, as above).
val allPendingTasks = new ArrayBuffer[Int]
// Tasks that can be speculated. Since these will be a small fraction of total
@@ -144,25 +113,24 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
// Task index, start and finish time for each task attempt (indexed by task ID)
val taskInfos = new HashMap[Long, TaskInfo]
- // Did the job fail?
+ // Did the TaskSet fail?
var failed = false
var causeOfFailure = ""
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
- // Map of recent exceptions (identified by string representation and
- // top stack frame) to duplicate count (how many times the same
- // exception has appeared) and time the full exception was
- // printed. This should ideally be an LRU map that can drop old
- // exceptions automatically.
+
+ // Map of recent exceptions (identified by string representation and top stack frame) to
+ // duplicate count (how many times the same exception has appeared) and time the full exception
+ // was printed. This should ideally be an LRU map that can drop old exceptions automatically.
val recentExceptions = HashMap[String, (Int, Long)]()
- // Figure out the current map output tracker generation and set it on all tasks
- val generation = sched.mapOutputTracker.getGeneration
- logDebug("Generation for " + taskSet.id + ": " + generation)
+ // Figure out the current map output tracker epoch and set it on all tasks
+ val epoch = sched.mapOutputTracker.getEpoch
+ logDebug("Epoch for " + taskSet + ": " + epoch)
for (t <- tasks) {
- t.generation = generation
+ t.epoch = epoch
}
// Add all our tasks to the pending lists. We do this in reverse order
@@ -171,166 +139,86 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
addPendingTask(i)
}
- // Note that it follows the hierarchy.
- // if we search for NODE_LOCAL, the output will include PROCESS_LOCAL and
- // if we search for RACK_LOCAL, it will include PROCESS_LOCAL & NODE_LOCAL
- private def findPreferredLocations(
- _taskPreferredLocations: Seq[String],
- scheduler: ClusterScheduler,
- taskLocality: TaskLocality.TaskLocality): HashSet[String] =
- {
- if (TaskLocality.PROCESS_LOCAL == taskLocality) {
- // straight forward comparison ! Special case it.
- val retval = new HashSet[String]()
- scheduler.synchronized {
- for (location <- _taskPreferredLocations) {
- if (scheduler.isExecutorAliveOnHostPort(location)) {
- retval += location
- }
- }
- }
-
- return retval
- }
-
- val taskPreferredLocations = {
- if (TaskLocality.NODE_LOCAL == taskLocality) {
- _taskPreferredLocations
- } else {
- assert (TaskLocality.RACK_LOCAL == taskLocality)
- // Expand set to include all 'seen' rack local hosts.
- // This works since container allocation/management happens within master -
- // so any rack locality information is updated in msater.
- // Best case effort, and maybe sort of kludge for now ... rework it later ?
- val hosts = new HashSet[String]
- _taskPreferredLocations.foreach(h => {
- val rackOpt = scheduler.getRackForHost(h)
- if (rackOpt.isDefined) {
- val hostsOpt = scheduler.getCachedHostsForRack(rackOpt.get)
- if (hostsOpt.isDefined) {
- hosts ++= hostsOpt.get
- }
- }
+ // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
+ val myLocalityLevels = computeValidLocalityLevels()
+ val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
- // Ensure that irrespective of what scheduler says, host is always added !
- hosts += h
- })
+ // Delay scheduling variables: we keep track of our current locality level and the time we
+ // last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
+ // We then move down if we manage to launch a "more local" task.
+ var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
+ var lastLaunchTime = clock.getTime() // Time we last launched a task at this level
- hosts
+ /**
+ * Add a task to all the pending-task lists that it should be on. If readding is set, we are
+ * re-adding the task so only include it in each list if it's not already there.
+ */
+ private def addPendingTask(index: Int, readding: Boolean = false) {
+ // Utility method that adds `index` to a list only if readding=false or it's not already there
+ def addTo(list: ArrayBuffer[Int]) {
+ if (!readding || !list.contains(index)) {
+ list += index
}
}
- val retval = new HashSet[String]
- scheduler.synchronized {
- for (prefLocation <- taskPreferredLocations) {
- val aliveLocationsOpt = scheduler.getExecutorsAliveOnHost(Utils.parseHostPort(prefLocation)._1)
- if (aliveLocationsOpt.isDefined) {
- retval ++= aliveLocationsOpt.get
+ var hadAliveLocations = false
+ for (loc <- tasks(index).preferredLocations) {
+ for (execId <- loc.executorId) {
+ if (sched.isExecutorAlive(execId)) {
+ addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
+ hadAliveLocations = true
}
}
- }
-
- retval
- }
-
- // Add a task to all the pending-task lists that it should be on.
- private def addPendingTask(index: Int) {
- // We can infer hostLocalLocations from rackLocalLocations by joining it against
- // tasks(index).preferredLocations (with appropriate hostPort <-> host conversion).
- // But not doing it for simplicity sake. If this becomes a performance issue, modify it.
- val locs = tasks(index).preferredLocations
- val processLocalLocations = findPreferredLocations(locs, sched, TaskLocality.PROCESS_LOCAL)
- val hostLocalLocations = findPreferredLocations(locs, sched, TaskLocality.NODE_LOCAL)
- val rackLocalLocations = findPreferredLocations(locs, sched, TaskLocality.RACK_LOCAL)
-
- if (rackLocalLocations.size == 0) {
- // Current impl ensures this.
- assert (processLocalLocations.size == 0)
- assert (hostLocalLocations.size == 0)
- pendingTasksWithNoPrefs += index
- } else {
-
- // process local locality
- for (hostPort <- processLocalLocations) {
- // DEBUG Code
- Utils.checkHostPort(hostPort)
-
- val hostPortList = pendingTasksForHostPort.getOrElseUpdate(hostPort, ArrayBuffer())
- hostPortList += index
- }
-
- // host locality (includes process local)
- for (hostPort <- hostLocalLocations) {
- // DEBUG Code
- Utils.checkHostPort(hostPort)
-
- val host = Utils.parseHostPort(hostPort)._1
- val hostList = pendingTasksForHost.getOrElseUpdate(host, ArrayBuffer())
- hostList += index
+ if (sched.hasExecutorsAliveOnHost(loc.host)) {
+ addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
+ for (rack <- sched.getRackForHost(loc.host)) {
+ addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
+ }
+ hadAliveLocations = true
}
+ }
- // rack locality (includes process local and host local)
- for (rackLocalHostPort <- rackLocalLocations) {
- // DEBUG Code
- Utils.checkHostPort(rackLocalHostPort)
-
- val rackLocalHost = Utils.parseHostPort(rackLocalHostPort)._1
- val list = pendingRackLocalTasksForHost.getOrElseUpdate(rackLocalHost, ArrayBuffer())
- list += index
- }
+ if (!hadAliveLocations) {
+ // Even though the task might've had preferred locations, all of those hosts or executors
+ // are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
+ addTo(pendingTasksWithNoPrefs)
}
- allPendingTasks += index
+ if (!readding) {
+ allPendingTasks += index // No point scanning this whole list to find the old task there
+ }
}
- // Return the pending tasks list for a given host port (process local), or an empty list if
- // there is no map entry for that host
- private def getPendingTasksForHostPort(hostPort: String): ArrayBuffer[Int] = {
- // DEBUG Code
- Utils.checkHostPort(hostPort)
- pendingTasksForHostPort.getOrElse(hostPort, ArrayBuffer())
+ /**
+ * Return the pending tasks list for a given executor ID, or an empty list if
+ * there is no map entry for that host
+ */
+ private def getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int] = {
+ pendingTasksForExecutor.getOrElse(executorId, ArrayBuffer())
}
- // Return the pending tasks list for a given host, or an empty list if
- // there is no map entry for that host
- private def getPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
- val host = Utils.parseHostPort(hostPort)._1
+ /**
+ * Return the pending tasks list for a given host, or an empty list if
+ * there is no map entry for that host
+ */
+ private def getPendingTasksForHost(host: String): ArrayBuffer[Int] = {
pendingTasksForHost.getOrElse(host, ArrayBuffer())
}
- // Return the pending tasks (rack level) list for a given host, or an empty list if
- // there is no map entry for that host
- private def getRackLocalPendingTasksForHost(hostPort: String): ArrayBuffer[Int] = {
- val host = Utils.parseHostPort(hostPort)._1
- pendingRackLocalTasksForHost.getOrElse(host, ArrayBuffer())
- }
-
- // Number of pending tasks for a given host Port (which would be process local)
- override def numPendingTasksForHostPort(hostPort: String): Int = {
- getPendingTasksForHostPort(hostPort).count { index =>
- copiesRunning(index) == 0 && !finished(index)
- }
- }
-
- // Number of pending tasks for a given host (which would be data local)
- override def numPendingTasksForHost(hostPort: String): Int = {
- getPendingTasksForHost(hostPort).count { index =>
- copiesRunning(index) == 0 && !finished(index)
- }
- }
-
- // Number of pending rack local tasks for a given host
- override def numRackLocalPendingTasksForHost(hostPort: String): Int = {
- getRackLocalPendingTasksForHost(hostPort).count { index =>
- copiesRunning(index) == 0 && !finished(index)
- }
+ /**
+ * Return the pending rack-local task list for a given rack, or an empty list if
+ * there is no map entry for that rack
+ */
+ private def getPendingTasksForRack(rack: String): ArrayBuffer[Int] = {
+ pendingTasksForRack.getOrElse(rack, ArrayBuffer())
}
-
- // Dequeue a pending task from the given list and return its index.
- // Return None if the list is empty.
- // This method also cleans up any tasks in the list that have already
- // been launched, since we want that to happen lazily.
+ /**
+ * Dequeue a pending task from the given list and return its index.
+ * Return None if the list is empty.
+ * This method also cleans up any tasks in the list that have already
+ * been launched, since we want that to happen lazily.
+ */
private def findTaskFromList(list: ArrayBuffer[Int]): Option[Int] = {
while (!list.isEmpty) {
val index = list.last
@@ -342,191 +230,158 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
return None
}
- // Return a speculative task for a given host if any are available. The task should not have an
- // attempt running on this host, in case the host is slow. In addition, if locality is set, the
- // task must have a preference for this host/rack/no preferred locations at all.
- private def findSpeculativeTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
+ /** Check whether a task is currently running an attempt on a given host */
+ private def hasAttemptOnHost(taskIndex: Int, host: String): Boolean = {
+ !taskAttempts(taskIndex).exists(_.host == host)
+ }
- assert (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL))
+ /**
+ * Return a speculative task for a given executor if any are available. The task should not have
+ * an attempt running on this host, in case the host is slow. In addition, the task should meet
+ * the given locality constraint.
+ */
+ private def findSpeculativeTask(execId: String, host: String, locality: TaskLocality.Value)
+ : Option[(Int, TaskLocality.Value)] =
+ {
speculatableTasks.retain(index => !finished(index)) // Remove finished tasks from set
- if (speculatableTasks.size > 0) {
- val localTask = speculatableTasks.find { index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
- TaskLocality.NODE_LOCAL)
- val attemptLocs = taskAttempts(index).map(_.hostPort)
- (locations.size == 0 || locations.contains(hostPort)) && !attemptLocs.contains(hostPort)
+ if (!speculatableTasks.isEmpty) {
+ // Check for process-local or preference-less tasks; note that tasks can be process-local
+ // on multiple nodes when we replicate cached blocks, as in Spark Streaming
+ for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+ val prefs = tasks(index).preferredLocations
+ val executors = prefs.flatMap(_.executorId)
+ if (prefs.size == 0 || executors.contains(execId)) {
+ speculatableTasks -= index
+ return Some((index, TaskLocality.PROCESS_LOCAL))
+ }
}
- if (localTask != None) {
- speculatableTasks -= localTask.get
- return localTask
+ // Check for node-local tasks
+ if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+ for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+ val locations = tasks(index).preferredLocations.map(_.host)
+ if (locations.contains(host)) {
+ speculatableTasks -= index
+ return Some((index, TaskLocality.NODE_LOCAL))
+ }
+ }
}
- // check for rack locality
+ // Check for rack-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
- val rackTask = speculatableTasks.find { index =>
- val locations = findPreferredLocations(tasks(index).preferredLocations, sched,
- TaskLocality.RACK_LOCAL)
- val attemptLocs = taskAttempts(index).map(_.hostPort)
- locations.contains(hostPort) && !attemptLocs.contains(hostPort)
- }
-
- if (rackTask != None) {
- speculatableTasks -= rackTask.get
- return rackTask
+ for (rack <- sched.getRackForHost(host)) {
+ for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+ val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
+ if (racks.contains(rack)) {
+ speculatableTasks -= index
+ return Some((index, TaskLocality.RACK_LOCAL))
+ }
+ }
}
}
- // Any task ...
+ // Check for non-local tasks
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
- // Check for attemptLocs also ?
- val nonLocalTask = speculatableTasks.find { i =>
- !taskAttempts(i).map(_.hostPort).contains(hostPort)
- }
- if (nonLocalTask != None) {
- speculatableTasks -= nonLocalTask.get
- return nonLocalTask
+ for (index <- speculatableTasks if !hasAttemptOnHost(index, host)) {
+ speculatableTasks -= index
+ return Some((index, TaskLocality.ANY))
}
}
}
+
return None
}
- // Dequeue a pending task for a given node and return its index.
- // If localOnly is set to false, allow non-local tasks as well.
- private def findTask(hostPort: String, locality: TaskLocality.TaskLocality): Option[Int] = {
- val processLocalTask = findTaskFromList(getPendingTasksForHostPort(hostPort))
- if (processLocalTask != None) {
- return processLocalTask
+ /**
+ * Dequeue a pending task for a given node and return its index and locality level.
+ * Only search for tasks matching the given locality constraint.
+ */
+ private def findTask(execId: String, host: String, locality: TaskLocality.Value)
+ : Option[(Int, TaskLocality.Value)] =
+ {
+ for (index <- findTaskFromList(getPendingTasksForExecutor(execId))) {
+ return Some((index, TaskLocality.PROCESS_LOCAL))
}
- val localTask = findTaskFromList(getPendingTasksForHost(hostPort))
- if (localTask != None) {
- return localTask
+ if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
+ for (index <- findTaskFromList(getPendingTasksForHost(host))) {
+ return Some((index, TaskLocality.NODE_LOCAL))
+ }
}
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
- val rackLocalTask = findTaskFromList(getRackLocalPendingTasksForHost(hostPort))
- if (rackLocalTask != None) {
- return rackLocalTask
+ for {
+ rack <- sched.getRackForHost(host)
+ index <- findTaskFromList(getPendingTasksForRack(rack))
+ } {
+ return Some((index, TaskLocality.RACK_LOCAL))
}
}
- // Look for no pref tasks AFTER rack local tasks - this has side effect that we will get to
- // failed tasks later rather than sooner.
- // TODO: That code path needs to be revisited (adding to no prefs list when host:port goes down).
- val noPrefTask = findTaskFromList(pendingTasksWithNoPrefs)
- if (noPrefTask != None) {
- return noPrefTask
+ // Look for no-pref tasks after rack-local tasks since they can run anywhere.
+ for (index <- findTaskFromList(pendingTasksWithNoPrefs)) {
+ return Some((index, TaskLocality.PROCESS_LOCAL))
}
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
- val nonLocalTask = findTaskFromList(allPendingTasks)
- if (nonLocalTask != None) {
- return nonLocalTask
+ for (index <- findTaskFromList(allPendingTasks)) {
+ return Some((index, TaskLocality.ANY))
}
}
// Finally, if all else has failed, find a speculative task
- return findSpeculativeTask(hostPort, locality)
- }
-
- private def isProcessLocalLocation(task: Task[_], hostPort: String): Boolean = {
- Utils.checkHostPort(hostPort)
-
- val locs = task.preferredLocations
-
- locs.contains(hostPort)
- }
-
- private def isHostLocalLocation(task: Task[_], hostPort: String): Boolean = {
- val locs = task.preferredLocations
-
- // If no preference, consider it as host local
- if (locs.isEmpty) return true
-
- val host = Utils.parseHostPort(hostPort)._1
- locs.find(h => Utils.parseHostPort(h)._1 == host).isDefined
- }
-
- // Does a host count as a rack local preferred location for a task?
- // (assumes host is NOT preferred location).
- // This is true if either the task has preferred locations and this host is one, or it has
- // no preferred locations (in which we still count the launch as preferred).
- private def isRackLocalLocation(task: Task[_], hostPort: String): Boolean = {
-
- val locs = task.preferredLocations
-
- val preferredRacks = new HashSet[String]()
- for (preferredHost <- locs) {
- val rack = sched.getRackForHost(preferredHost)
- if (None != rack) preferredRacks += rack.get
- }
-
- if (preferredRacks.isEmpty) return false
-
- val hostRack = sched.getRackForHost(hostPort)
-
- return None != hostRack && preferredRacks.contains(hostRack.get)
+ return findSpeculativeTask(execId, host, locality)
}
- // Respond to an offer of a single slave from the scheduler by finding a task
- override def slaveOffer(
+ /**
+ * Respond to an offer of a single slave from the scheduler by finding a task
+ */
+ override def resourceOffer(
execId: String,
- hostPort: String,
- availableCpus: Double,
- overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
+ : Option[TaskDescription] =
{
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
- // If explicitly specified, use that
- val locality = if (overrideLocality != null) overrideLocality else {
- // expand only if we have waited for more than LOCALITY_WAIT for a host local task ...
- val time = System.currentTimeMillis
- if (time - lastPreferredLaunchTime < LOCALITY_WAIT) {
- TaskLocality.NODE_LOCAL
- } else {
- TaskLocality.ANY
- }
+ val curTime = clock.getTime()
+
+ var allowedLocality = getAllowedLocalityLevel(curTime)
+ if (allowedLocality > maxLocality) {
+ allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
}
- findTask(hostPort, locality) match {
- case Some(index) => {
- // Found a task; do some bookkeeping and return a Mesos task for it
+ findTask(execId, host, allowedLocality) match {
+ case Some((index, taskLocality)) => {
+ // Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Figure out whether this should count as a preferred launch
- val taskLocality =
- if (isProcessLocalLocation(task, hostPort)) TaskLocality.PROCESS_LOCAL
- else if (isHostLocalLocation(task, hostPort)) TaskLocality.NODE_LOCAL
- else if (isRackLocalLocation(task, hostPort)) TaskLocality.RACK_LOCAL
- else TaskLocality.ANY
- val prefStr = taskLocality.toString
logInfo("Starting task %s:%d as TID %s on slave %s: %s (%s)".format(
- taskSet.id, index, taskId, execId, hostPort, prefStr))
+ taskSet.id, index, taskId, execId, host, taskLocality))
// Do various bookkeeping
copiesRunning(index) += 1
- val time = System.currentTimeMillis
- val info = new TaskInfo(taskId, index, time, execId, hostPort, taskLocality)
+ val info = new TaskInfo(taskId, index, curTime, execId, host, taskLocality)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
- if (taskLocality == TaskLocality.PROCESS_LOCAL || taskLocality == TaskLocality.NODE_LOCAL) {
- lastPreferredLaunchTime = time
- }
+ // Update our locality level for delay scheduling
+ currentLocalityIndex = getLocalityIndex(taskLocality)
+ lastLaunchTime = curTime
// Serialize and return the task
- val startTime = System.currentTimeMillis
+ val startTime = clock.getTime()
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
// we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
- val timeTaken = System.currentTimeMillis - startTime
+ val timeTaken = clock.getTime() - startTime
increaseRunningTasks(1)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
taskSet.id, index, serializedTask.limit, timeTaken))
val taskName = "task %s:%d".format(taskSet.id, index)
if (taskAttempts(index).size == 1)
taskStarted(task,info)
- return Some(new TaskDescription(taskId, execId, taskName, serializedTask))
+ return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}
case _ =>
}
@@ -534,6 +389,35 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
return None
}
+ /**
+ * Get the level we can launch tasks according to delay scheduling, based on current wait time.
+ */
+ private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
+ while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
+ currentLocalityIndex < myLocalityLevels.length - 1)
+ {
+ // Jump to the next locality level, and remove our waiting time for the current one since
+ // we don't want to count it again on the next one
+ lastLaunchTime += localityWaits(currentLocalityIndex)
+ currentLocalityIndex += 1
+ }
+ myLocalityLevels(currentLocalityIndex)
+ }
+
+ /**
+ * Find the index in myLocalityLevels for a given locality. This is also designed to work with
+ * localities that are not in myLocalityLevels (in case we somehow get those) by returning the
+ * next-biggest level we have. Uses the fact that the last value in myLocalityLevels is ANY.
+ */
+ def getLocalityIndex(locality: TaskLocality.TaskLocality): Int = {
+ var index = 0
+ while (locality > myLocalityLevels(index)) {
+ index += 1
+ }
+ index
+ }
+
+ /** Called by cluster scheduler when one of our tasks changes state */
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
@@ -566,7 +450,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
if (!finished(index)) {
tasksFinished += 1
logInfo("Finished TID %s in %d ms on %s (progress: %d/%d)".format(
- tid, info.duration, info.hostPort, tasksFinished, numTasks))
+ tid, info.duration, info.host, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
try {
val result = ser.deserialize[TaskResult[_]](serializedData)
@@ -626,7 +510,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
case ef: ExceptionFailure =>
sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null))
val key = ef.description
- val now = System.currentTimeMillis
+ val now = clock.getTime()
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
val (dupCount, printTime) = recentExceptions(key)
@@ -698,44 +582,33 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
}
}
- // TODO(xiajunluan): for now we just find Pool not TaskSetManager
- // we can extend this function in future if needed
override def getSchedulableByName(name: String): Schedulable = {
return null
}
- override def addSchedulable(schedulable:Schedulable) {
- // nothing
- }
+ override def addSchedulable(schedulable: Schedulable) {}
- override def removeSchedulable(schedulable:Schedulable) {
- // nothing
- }
+ override def removeSchedulable(schedulable: Schedulable) {}
override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
+ var sortedTaskSetQueue = ArrayBuffer[TaskSetManager](this)
sortedTaskSetQueue += this
return sortedTaskSetQueue
}
- override def executorLost(execId: String, hostPort: String) {
+ /** Called by cluster scheduler when an executor is lost so we can re-enqueue our tasks */
+ override def executorLost(execId: String, host: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
- // If some task has preferred locations only on hostname, and there are no more executors there,
- // put it in the no-prefs list to avoid the wait from delay scheduling
-
- // host local tasks - should we push this to rack local or no pref list ? For now, preserving
- // behavior and moving to no prefs list. Note, this was done due to impliations related to
- // 'waiting' for data local tasks, etc.
- // Note: NOT checking process local list - since host local list is super set of that. We need
- // to ad to no prefs only if there is no host local node for the task (not if there is no
- // process local node for the task)
- for (index <- getPendingTasksForHost(Utils.parseHostPort(hostPort)._1)) {
- val newLocs = findPreferredLocations(
- tasks(index).preferredLocations, sched, TaskLocality.NODE_LOCAL)
- if (newLocs.isEmpty) {
- pendingTasksWithNoPrefs += index
- }
+ // Re-enqueue pending tasks for this host based on the status of the cluster -- for example, a
+ // task that used to have locations on only this host might now go to the no-prefs list. Note
+ // that it's okay if we add a task to the same queue twice (if it had multiple preferred
+ // locations), because findTaskFromList will skip already-running tasks.
+ for (index <- getPendingTasksForExecutor(execId)) {
+ addPendingTask(index, readding=true)
+ }
+ for (index <- getPendingTasksForHost(host)) {
+ addPendingTask(index, readding=true)
}
// Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage
@@ -775,7 +648,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
if (tasksFinished >= minFinishedForSpeculation) {
- val time = System.currentTimeMillis()
+ val time = clock.getTime()
val durations = taskInfos.values.filter(_.successful).map(_.duration).toArray
Arrays.sort(durations)
val medianDuration = durations(min((0.5 * numTasks).round.toInt, durations.size - 1))
@@ -789,7 +662,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
!speculatableTasks.contains(index)) {
logInfo(
"Marking task %s:%d (on %s) as speculatable because it ran more than %.0f ms".format(
- taskSet.id, index, info.hostPort, threshold))
+ taskSet.id, index, info.host, threshold))
speculatableTasks += index
foundTasks = true
}
@@ -801,4 +674,39 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
override def hasPendingTasks(): Boolean = {
numTasks > 0 && tasksFinished < numTasks
}
+
+ private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
+ val defaultWait = System.getProperty("spark.locality.wait", "3000")
+ level match {
+ case TaskLocality.PROCESS_LOCAL =>
+ System.getProperty("spark.locality.wait.process", defaultWait).toLong
+ case TaskLocality.NODE_LOCAL =>
+ System.getProperty("spark.locality.wait.node", defaultWait).toLong
+ case TaskLocality.RACK_LOCAL =>
+ System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+ case TaskLocality.ANY =>
+ 0L
+ }
+ }
+
+ /**
+ * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
+ * added to queues using addPendingTask.
+ */
+ private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
+ import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
+ val levels = new ArrayBuffer[TaskLocality.TaskLocality]
+ if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
+ levels += PROCESS_LOCAL
+ }
+ if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
+ levels += NODE_LOCAL
+ }
+ if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
+ levels += RACK_LOCAL
+ }
+ levels += ANY
+ logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
+ levels.toArray
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 55d6c0a47e..42c3b4a6cf 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -77,7 +77,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def executorAdded(executorId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
- executorId, hostPort, cores, Utils.memoryMegabytesToString(memory)))
+ executorId, hostPort, cores, Utils.megabytesToString(memory)))
}
override def executorRemoved(executorId: String, message: String, exitStatus: Option[Int]) {
diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 075a7cbf7e..3203be1029 100644
--- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -26,6 +26,7 @@ import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
import akka.util.Duration
+import akka.util.duration._
import spark.{Utils, SparkException, Logging, TaskState}
import spark.scheduler.cluster.StandaloneClusterMessages._
@@ -37,15 +38,15 @@ import spark.scheduler.cluster.StandaloneClusterMessages._
*/
private[spark]
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
- extends SchedulerBackend with Logging {
-
+ extends SchedulerBackend with Logging
+{
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed
var totalCoreCount = new AtomicInteger(0)
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor {
private val executorActor = new HashMap[String, ActorRef]
private val executorAddress = new HashMap[String, Address]
- private val executorHostPort = new HashMap[String, String]
+ private val executorHost = new HashMap[String, String]
private val freeCores = new HashMap[String, Int]
private val actorToExecutorId = new HashMap[ActorRef, String]
private val addressToExecutorId = new HashMap[Address, String]
@@ -53,6 +54,10 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
override def preStart() {
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+
+ // Periodically revive offers to allow delay scheduling to work
+ val reviveInterval = System.getProperty("spark.scheduler.revive.interval", "1000").toLong
+ context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers)
}
def receive = {
@@ -65,7 +70,7 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
sender ! RegisteredExecutor(sparkProperties)
context.watch(sender)
executorActor(executorId) = sender
- executorHostPort(executorId) = hostPort
+ executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
executorAddress(executorId) = sender.path.address
actorToExecutorId(sender) = executorId
@@ -105,13 +110,13 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(
- executorHostPort.toArray.map {case (id, hostPort) => new WorkerOffer(id, hostPort, freeCores(id))}))
+ executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
// Make fake resource offers on just one executor
def makeOffers(executorId: String) {
launchTasks(scheduler.resourceOffers(
- Seq(new WorkerOffer(executorId, executorHostPort(executorId), freeCores(executorId)))))
+ Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
@@ -130,9 +135,8 @@ class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Actor
actorToExecutorId -= executorActor(executorId)
addressToExecutorId -= executorAddress(executorId)
executorActor -= executorId
- executorHostPort -= executorId
+ executorHost -= executorId
freeCores -= executorId
- executorHostPort -= executorId
totalCoreCount.addAndGet(-numCores)
scheduler.executorLost(executorId, SlaveLost(reason))
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
index 761fdf6919..187553233f 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskDescription.scala
@@ -24,6 +24,7 @@ private[spark] class TaskDescription(
val taskId: Long,
val executorId: String,
val name: String,
+ val index: Int, // Index within this task's TaskSet
_serializedTask: ByteBuffer)
extends Serializable {
@@ -31,4 +32,6 @@ private[spark] class TaskDescription(
private val buffer = new SerializableBuffer(_serializedTask)
def serializedTask: ByteBuffer = buffer.value
+
+ override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
index c693b722ac..c2c5522686 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala
@@ -28,11 +28,9 @@ class TaskInfo(
val index: Int,
val launchTime: Long,
val executorId: String,
- val hostPort: String,
+ val host: String,
val taskLocality: TaskLocality.TaskLocality) {
- Utils.checkHostPort(hostPort, "Expected hostport")
-
var finishTime: Long = 0
var failed = false
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala b/core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala
new file mode 100644
index 0000000000..1c33e41f87
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskLocality.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler.cluster
+
+
+private[spark] object TaskLocality
+ extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
+{
+ // process local is expected to be used ONLY within tasksetmanager for now.
+ val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
+
+ type TaskLocality = Value
+
+ def isAllowed(constraint: TaskLocality, condition: TaskLocality): Boolean = {
+ condition <= constraint
+ }
+}
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 1a92a5ed6f..0248830b7a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -22,6 +22,15 @@ import java.nio.ByteBuffer
import spark.TaskState.TaskState
import spark.scheduler.TaskSet
+/**
+ * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
+ * each task and is responsible for retries on failure and locality. The main interfaces to it
+ * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
+ * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
+ *
+ * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
+ * (e.g. its event handlers). It should not be called from other threads.
+ */
private[spark] trait TaskSetManager extends Schedulable {
def schedulableQueue = null
@@ -29,17 +38,12 @@ private[spark] trait TaskSetManager extends Schedulable {
def taskSet: TaskSet
- def slaveOffer(
+ def resourceOffer(
execId: String,
- hostPort: String,
- availableCpus: Double,
- overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription]
-
- def numPendingTasksForHostPort(hostPort: String): Int
-
- def numRackLocalPendingTasksForHost(hostPort: String): Int
-
- def numPendingTasksForHost(hostPort: String): Int
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
+ : Option[TaskDescription]
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
index 06d1203f70..1d09bd9b03 100644
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
@@ -21,5 +21,4 @@ package spark.scheduler.cluster
* Represents free resources available on an executor.
*/
private[spark]
-class WorkerOffer(val executorId: String, val hostPort: String, val cores: Int) {
-}
+class WorkerOffer(val executorId: String, val host: String, val cores: Int)
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index 6c43928bc8..5be4dbd9f0 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -141,7 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
for (manager <- sortedTaskSetQueue) {
do {
launchTask = false
- manager.slaveOffer(null, null, freeCpuCores) match {
+ manager.resourceOffer(null, null, freeCpuCores, null) match {
case Some(task) =>
tasks += task
taskIdToTaskSetId(task.taskId) = manager.taskSet.id
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index c38eeb9e11..e237f289e3 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -98,14 +98,15 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- override def slaveOffer(
+ override def resourceOffer(
execId: String,
- hostPort: String,
- availableCpus: Double,
- overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] =
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
+ : Option[TaskDescription] =
{
SparkEnv.set(sched.env)
- logDebug("availableCpus:%d,numFinished:%d,numTasks:%d".format(
+ logDebug("availableCpus:%d, numFinished:%d, numTasks:%d".format(
availableCpus.toInt, numFinished, numTasks))
if (availableCpus > 0 && numFinished < numTasks) {
findTask() match {
@@ -124,25 +125,13 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
copiesRunning(index) += 1
increaseRunningTasks(1)
taskStarted(task, info)
- return Some(new TaskDescription(taskId, null, taskName, bytes))
+ return Some(new TaskDescription(taskId, null, taskName, index, bytes))
case None => {}
}
}
return None
}
- override def numPendingTasksForHostPort(hostPort: String): Int = {
- return 0
- }
-
- override def numRackLocalPendingTasksForHost(hostPort :String): Int = {
- return 0
- }
-
- override def numPendingTasksForHost(hostPort: String): Int = {
- return 0
- }
-
override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
SparkEnv.set(env)
state match {
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
index 07e3db30fe..568783d893 100644
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
@@ -111,7 +111,7 @@ object BlockFetcherIterator {
protected def sendRequest(req: FetchRequest) {
logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.memoryBytesToString(req.size), req.address.hostPort))
+ req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
val cmId = new ConnectionManagerId(req.address.host, req.address.port)
val blockMessageArray = new BlockMessageArray(req.blocks.map {
case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
@@ -310,7 +310,7 @@ object BlockFetcherIterator {
}
logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.memoryBytesToString(req.size), req.address.host))
+ req.blocks.size, Utils.bytesToString(req.size), req.address.host))
val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
val cpier = new ShuffleCopier
cpier.getBlocks(cmId, req.blocks, putResult)
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 3a72474419..2a6ec2a55d 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -1004,43 +1004,43 @@ private[spark] object BlockManager extends Logging {
}
}
- def blockIdsToExecutorLocations(blockIds: Array[String], env: SparkEnv, blockManagerMaster: BlockManagerMaster = null): HashMap[String, List[String]] = {
+ def blockIdsToBlockManagers(
+ blockIds: Array[String],
+ env: SparkEnv,
+ blockManagerMaster: BlockManagerMaster = null)
+ : Map[String, Seq[BlockManagerId]] =
+ {
// env == null and blockManagerMaster != null is used in tests
assert (env != null || blockManagerMaster != null)
- val locationBlockIds: Seq[Seq[BlockManagerId]] =
- if (env != null) {
- env.blockManager.getLocationBlockIds(blockIds)
- } else {
- blockManagerMaster.getLocations(blockIds)
- }
+ val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
+ env.blockManager.getLocationBlockIds(blockIds)
+ } else {
+ blockManagerMaster.getLocations(blockIds)
+ }
- // Convert from block master locations to executor locations (we need that for task scheduling)
- val executorLocations = new HashMap[String, List[String]]()
+ val blockManagers = new HashMap[String, Seq[BlockManagerId]]
for (i <- 0 until blockIds.length) {
- val blockId = blockIds(i)
- val blockLocations = locationBlockIds(i)
-
- val executors = new HashSet[String]()
-
- if (env != null) {
- for (bkLocation <- blockLocations) {
- val executorHostPort = env.resolveExecutorIdToHostPort(bkLocation.executorId, bkLocation.host)
- executors += executorHostPort
- // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
- }
- } else {
- // Typically while testing, etc - revert to simply using host.
- for (bkLocation <- blockLocations) {
- executors += bkLocation.host
- // logInfo("bkLocation = " + bkLocation + ", executorHostPort = " + executorHostPort)
- }
- }
-
- executorLocations.put(blockId, executors.toSeq.toList)
+ blockManagers(blockIds(i)) = blockLocations(i)
}
+ blockManagers.toMap
+ }
- executorLocations
+ def blockIdsToExecutorIds(
+ blockIds: Array[String],
+ env: SparkEnv,
+ blockManagerMaster: BlockManagerMaster = null)
+ : Map[String, Seq[String]] =
+ {
+ blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.executorId))
}
+ def blockIdsToHosts(
+ blockIds: Array[String],
+ env: SparkEnv,
+ blockManagerMaster: BlockManagerMaster = null)
+ : Map[String, Seq[String]] =
+ {
+ blockIdsToBlockManagers(blockIds, env, blockManagerMaster).mapValues(s => s.map(_.host))
+ }
}
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 011bb6b83d..2a2e178550 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -332,7 +332,7 @@ object BlockManagerMasterActor {
private val _blocks = new JHashMap[String, BlockStatus]
logInfo("Registering block manager %s with %s RAM".format(
- blockManagerId.hostPort, Utils.memoryBytesToString(maxMem)))
+ blockManagerId.hostPort, Utils.bytesToString(maxMem)))
def updateLastSeenMs() {
_lastSeenMs = System.currentTimeMillis()
@@ -358,12 +358,12 @@ object BlockManagerMasterActor {
if (storageLevel.useMemory) {
_remainingMem -= memSize
logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+ Utils.bytesToString(_remainingMem)))
}
if (storageLevel.useDisk) {
logInfo("Added %s on disk on %s (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(diskSize)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
} else if (_blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
@@ -372,12 +372,12 @@ object BlockManagerMasterActor {
if (blockStatus.storageLevel.useMemory) {
_remainingMem += blockStatus.memSize
logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(memSize),
- Utils.memoryBytesToString(_remainingMem)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
+ Utils.bytesToString(_remainingMem)))
}
if (blockStatus.storageLevel.useDisk) {
logInfo("Removed %s on %s on disk (size: %s)".format(
- blockId, blockManagerId.hostPort, Utils.memoryBytesToString(diskSize)))
+ blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
}
}
}
diff --git a/core/src/main/scala/spark/storage/DiskStore.scala b/core/src/main/scala/spark/storage/DiskStore.scala
index 3ebfe173b1..b14497157e 100644
--- a/core/src/main/scala/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/spark/storage/DiskStore.scala
@@ -147,7 +147,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
- blockId, Utils.memoryBytesToString(bytes.limit), (finishTime - startTime)))
+ blockId, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
}
private def getFileBytes(file: File): ByteBuffer = {
@@ -181,7 +181,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val timeTaken = System.currentTimeMillis - startTime
logDebug("Block %s stored as %s file on disk in %d ms".format(
- blockId, Utils.memoryBytesToString(length), timeTaken))
+ blockId, Utils.bytesToString(length), timeTaken))
if (returnValues) {
// Return a byte buffer for the contents of the file
diff --git a/core/src/main/scala/spark/storage/MemoryStore.scala b/core/src/main/scala/spark/storage/MemoryStore.scala
index b5a86b85a7..5a51f5cf31 100644
--- a/core/src/main/scala/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/spark/storage/MemoryStore.scala
@@ -38,7 +38,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
// blocks from the memory store.
private val putLock = new Object()
- logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
+ logInfo("MemoryStore started with capacity %s.".format(Utils.bytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
@@ -164,10 +164,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
} else {
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
- blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
+ blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))
}
true
} else {
diff --git a/core/src/main/scala/spark/storage/StorageUtils.scala b/core/src/main/scala/spark/storage/StorageUtils.scala
index 2aeed4ea3c..123b8f6345 100644
--- a/core/src/main/scala/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/spark/storage/StorageUtils.scala
@@ -42,9 +42,9 @@ case class RDDInfo(id: Int, name: String, storageLevel: StorageLevel,
numCachedPartitions: Int, numPartitions: Int, memSize: Long, diskSize: Long)
extends Ordered[RDDInfo] {
override def toString = {
- import Utils.memoryBytesToString
+ import Utils.bytesToString
"RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; DiskSize: %s".format(name, id,
- storageLevel.toString, numCachedPartitions, numPartitions, memoryBytesToString(memSize), memoryBytesToString(diskSize))
+ storageLevel.toString, numCachedPartitions, numPartitions, bytesToString(memSize), bytesToString(diskSize))
}
override def compare(that: RDDInfo) = {
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index f66fe39905..ba58f35729 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -21,10 +21,9 @@ import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
+import scala.util.parsing.json.JSONType
import scala.xml.Node
-import net.liftweb.json.{JValue, pretty, render}
-
import org.eclipse.jetty.server.{Server, Request, Handler}
import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler}
import org.eclipse.jetty.util.thread.QueuedThreadPool
@@ -39,8 +38,8 @@ private[spark] object JettyUtils extends Logging {
type Responder[T] = HttpServletRequest => T
// Conversions from various types of Responder's to jetty Handlers
- implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler =
- createHandler(responder, "text/json", (in: JValue) => pretty(render(in)))
+ implicit def jsonResponderToHandler(responder: Responder[JSONType]): Handler =
+ createHandler(responder, "text/json", (in: JSONType) => in.toString)
implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler =
createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString)
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 1fd5a0989e..23ded44ba3 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -30,7 +30,7 @@ import spark.ui.JettyUtils._
/** Top level user interface for Spark */
private[spark] class SparkUI(sc: SparkContext) extends Logging {
- val host = Utils.localHostName()
+ val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None
var server: Option[Server] = None
@@ -58,9 +58,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
server = Some(srv)
boundPort = Some(usedPort)
} catch {
- case e: Exception =>
- logError("Failed to create Spark JettyUtils", e)
- System.exit(1)
+ case e: Exception =>
+ logError("Failed to create Spark JettyUtils", e)
+ System.exit(1)
}
}
@@ -82,6 +82,6 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
}
private[spark] object SparkUI {
- val DEFAULT_PORT = "33000"
+ val DEFAULT_PORT = "3030"
val STATIC_RESOURCE_DIR = "spark/ui/static"
}
diff --git a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
index 28f6b3211c..f97860013e 100644
--- a/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/spark/ui/exec/ExecutorsUI.scala
@@ -46,10 +46,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<td>{kv(1)}</td>
<td>{kv(2)}</td>
<td sorttable_customkey={kv(3)}>
- {Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)}
+ {Utils.bytesToString(kv(3).toLong)} / {Utils.bytesToString(kv(4).toLong)}
</td>
<td sorttable_customkey={kv(5)}>
- {Utils.memoryBytesToString(kv(5).toLong)}
+ {Utils.bytesToString(kv(5).toLong)}
</td>
<td>{kv(6)}</td>
<td>{kv(7)}</td>
@@ -66,9 +66,9 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
<div class="span12">
<ul class="unstyled">
<li><strong>Memory:</strong>
- {Utils.memoryBytesToString(memUsed)} Used
- ({Utils.memoryBytesToString(maxMem)} Total) </li>
- <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
+ {Utils.bytesToString(memUsed)} Used
+ ({Utils.bytesToString(maxMem)} Total) </li>
+ <li><strong>Disk:</strong> {Utils.bytesToString(diskSpaceUsed)} Used </li>
</ul>
</div>
</div>
diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala
index f2a6f4f303..6948ea4dd9 100644
--- a/core/src/main/scala/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala
@@ -70,13 +70,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
{if (hasShuffleRead)
<li>
<strong>Shuffle read: </strong>
- {Utils.memoryBytesToString(shuffleReadBytes)}
+ {Utils.bytesToString(shuffleReadBytes)}
</li>
}
{if (hasShuffleWrite)
<li>
<strong>Shuffle write: </strong>
- {Utils.memoryBytesToString(shuffleWriteBytes)}
+ {Utils.bytesToString(shuffleWriteBytes)}
</li>
}
</ul>
@@ -105,7 +105,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
ms => parent.formatDuration(ms.toLong))
def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
+ Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
val shuffleReadSizes = validTasks.map {
case(info, metrics, exception) =>
@@ -156,7 +156,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<td>{info.taskId}</td>
<td>{info.status}</td>
<td>{info.taskLocality}</td>
- <td>{info.hostPort}</td>
+ <td>{info.host}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
@@ -166,11 +166,11 @@ private[spark] class StagePage(parent: JobProgressUI) {
</td>
{if (shuffleRead) {
<td>{metrics.flatMap{m => m.shuffleReadMetrics}.map{s =>
- Utils.memoryBytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
+ Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")}</td>
}}
{if (shuffleWrite) {
<td>{metrics.flatMap{m => m.shuffleWriteMetrics}.map{s =>
- Utils.memoryBytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
+ Utils.bytesToString(s.shuffleBytesWritten)}.getOrElse("")}</td>
}}
<td>{exception.map(e =>
<span>
diff --git a/core/src/main/scala/spark/ui/jobs/StageTable.scala b/core/src/main/scala/spark/ui/jobs/StageTable.scala
index 96bcc62480..b31f4abc26 100644
--- a/core/src/main/scala/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/spark/ui/jobs/StageTable.scala
@@ -64,11 +64,11 @@ private[spark] class StageTable(val stages: Seq[Stage], val parent: JobProgressU
val shuffleRead = listener.stageToShuffleRead.getOrElse(s.id, 0L) match {
case 0 => ""
- case b => Utils.memoryBytesToString(b)
+ case b => Utils.bytesToString(b)
}
val shuffleWrite = listener.stageToShuffleWrite.getOrElse(s.id, 0L) match {
case 0 => ""
- case b => Utils.memoryBytesToString(b)
+ case b => Utils.bytesToString(b)
}
val startedTasks = listener.stageToTasksActive.getOrElse(s.id, HashSet[TaskInfo]()).size
diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala
index f76192eba8..0751f9e8f9 100644
--- a/core/src/main/scala/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala
@@ -58,8 +58,8 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
</td>
<td>{rdd.numCachedPartitions}</td>
<td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td>
- <td>{Utils.memoryBytesToString(rdd.memSize)}</td>
- <td>{Utils.memoryBytesToString(rdd.diskSize)}</td>
+ <td>{Utils.bytesToString(rdd.memSize)}</td>
+ <td>{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
}
}
diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala
index 5fce1ea59b..f0b711e6ec 100644
--- a/core/src/main/scala/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala
@@ -72,11 +72,11 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
</li>
<li>
<strong>Memory Size:</strong>
- {Utils.memoryBytesToString(rddInfo.memSize)}
+ {Utils.bytesToString(rddInfo.memSize)}
</li>
<li>
<strong>Disk Size:</strong>
- {Utils.memoryBytesToString(rddInfo.diskSize)}
+ {Utils.bytesToString(rddInfo.diskSize)}
</li>
</ul>
</div>
@@ -107,10 +107,10 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
{block.storageLevel.description}
</td>
<td sorttable_customkey={block.memSize.toString}>
- {Utils.memoryBytesToString(block.memSize)}
+ {Utils.bytesToString(block.memSize)}
</td>
<td sorttable_customkey={block.diskSize.toString}>
- {Utils.memoryBytesToString(block.diskSize)}
+ {Utils.bytesToString(block.diskSize)}
</td>
<td>
{locations.map(l => <span>{l}<br/></span>)}
@@ -123,10 +123,10 @@ private[spark] class RDDPage(parent: BlockManagerUI) {
<tr>
<td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td>
<td>
- {Utils.memoryBytesToString(status.memUsed(prefix))}
- ({Utils.memoryBytesToString(status.memRemaining)} Remaining)
+ {Utils.bytesToString(status.memUsed(prefix))}
+ ({Utils.bytesToString(status.memRemaining)} Remaining)
</td>
- <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td>
+ <td>{Utils.bytesToString(status.diskUsed(prefix))}</td>
</tr>
}
}
diff --git a/core/src/main/scala/spark/util/Clock.scala b/core/src/main/scala/spark/util/Clock.scala
new file mode 100644
index 0000000000..aa71a5b442
--- /dev/null
+++ b/core/src/main/scala/spark/util/Clock.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+/**
+ * An interface to represent clocks, so that they can be mocked out in unit tests.
+ */
+private[spark] trait Clock {
+ def getTime(): Long
+}
+
+private[spark] object SystemClock extends Clock {
+ def getTime(): Long = System.currentTimeMillis()
+}
diff --git a/core/src/main/scala/spark/util/MutablePair.scala b/core/src/main/scala/spark/util/MutablePair.scala
new file mode 100644
index 0000000000..3063806e83
--- /dev/null
+++ b/core/src/main/scala/spark/util/MutablePair.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+
+/**
+ * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
+ * minimize object allocation.
+ *
+ * @param _1 Element 1 of this MutablePair
+ * @param _2 Element 2 of this MutablePair
+ */
+case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
+ @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
+ (var _1: T1, var _2: T2)
+ extends Product2[T1, T2]
+{
+ override def toString = "(" + _1 + "," + _2 + ")"
+
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[T1, T2]]
+}
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index a84c89e3c9..966dede2be 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ShuffledRDD") {
testCheckpointing(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD(rdd.map(x => (x % 2, 1)), partitioner)
+ new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
})
}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 4ab271de1a..c337c49268 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -748,7 +748,7 @@ public class JavaAPISuite implements Serializable {
}
};
- JavaRDD<Integer> sizes = rdd1.zipPartitions(sizesFn, rdd2);
+ JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
index ce6cec0451..c21f3331d0 100644
--- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala
@@ -112,22 +112,22 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
masterTracker.registerShuffle(10, 1)
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
- masterTracker.incrementGeneration()
- slaveTracker.updateGeneration(masterTracker.getGeneration)
+ masterTracker.incrementEpoch()
+ slaveTracker.updateEpoch(masterTracker.getEpoch)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
// failure should be cached
diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
index b102eaf4e6..328b3b5497 100644
--- a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
@@ -21,16 +21,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
import com.google.common.io.Files
-
-import spark.rdd.ShuffledRDD
import spark.SparkContext._
+
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("groupByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cbddf4e523..75778de1cc 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -170,7 +170,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
- assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+ assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 752e4b85e6..8745689c70 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -17,20 +17,14 @@
package spark
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-
-import com.google.common.io.Files
-import spark.rdd.ShuffledRDD
import spark.SparkContext._
+import spark.ShuffleSuite.NonJavaSerializableClass
+import spark.rdd.{SubtractedRDD, CoGroupedRDD, OrderedRDDFunctions, ShuffledRDD}
+import spark.util.MutablePair
+
class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
test("groupByKey without compression") {
@@ -55,12 +49,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
- (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ (x, new NonJavaSerializableClass(x * 2))
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS),
- classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+ b, new HashPartitioner(NUM_BLOCKS)).setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 10)
@@ -77,11 +71,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
sc = new SparkContext("local-cluster[2,1,512]", "test")
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
- (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ (x, new NonJavaSerializableClass(x * 2))
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(3), classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+ b, new HashPartitioner(3)).setSerializer(classOf[spark.KryoSerializer].getName)
assert(c.count === 10)
}
@@ -96,7 +91,8 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD(b, new HashPartitioner(10), classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ .setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)
@@ -121,7 +117,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val b = a.map(x => (x, x*2))
// NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD(b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)
@@ -135,6 +131,72 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// We should have at most 4 non-zero sized partitions
assert(nonEmptyBlocks.size <= 4)
}
+
+ test("shuffle using mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+ val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+ val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
+ .collect()
+
+ data.foreach { pair => results should contain (pair) }
+ }
+
+ test("sorting using mutable pairs") {
+ // This is not in SortingSuite because of the local cluster setup.
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
+ val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+ val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
+ .sortByKey().collect()
+ results(0) should be (p(1, 11))
+ results(1) should be (p(2, 22))
+ results(2) should be (p(3, 33))
+ results(3) should be (p(100, 100))
+ }
+
+ test("cogroup using mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+ val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
+ val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+ val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+ val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+
+ assert(results(1)(0).length === 3)
+ assert(results(1)(0).contains(1))
+ assert(results(1)(0).contains(2))
+ assert(results(1)(0).contains(3))
+ assert(results(1)(1).length === 2)
+ assert(results(1)(1).contains("11"))
+ assert(results(1)(1).contains("12"))
+ assert(results(2)(0).length === 1)
+ assert(results(2)(0).contains(1))
+ assert(results(2)(1).length === 1)
+ assert(results(2)(1).contains("22"))
+ assert(results(3)(0).length === 0)
+ assert(results(3)(1).contains("3"))
+ }
+
+ test("subtract mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
+ val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
+ val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+ val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+ val results = new SubtractedRDD(pairs1, pairs2, new HashPartitioner(2)).collect()
+ results should have length (1)
+ // substracted rdd return results as Tuple2
+ results(0) should be ((3, 33))
+ }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index 31c3b25c50..98a6c1a1c9 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -26,14 +26,14 @@ import scala.util.Random
class UtilsSuite extends FunSuite {
- test("memoryBytesToString") {
- assert(Utils.memoryBytesToString(10) === "10.0 B")
- assert(Utils.memoryBytesToString(1500) === "1500.0 B")
- assert(Utils.memoryBytesToString(2000000) === "1953.1 KB")
- assert(Utils.memoryBytesToString(2097152) === "2.0 MB")
- assert(Utils.memoryBytesToString(2306867) === "2.2 MB")
- assert(Utils.memoryBytesToString(5368709120L) === "5.0 GB")
- assert(Utils.memoryBytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
+ test("bytesToString") {
+ assert(Utils.bytesToString(10) === "10.0 B")
+ assert(Utils.bytesToString(1500) === "1500.0 B")
+ assert(Utils.bytesToString(2000000) === "1953.1 KB")
+ assert(Utils.bytesToString(2097152) === "2.0 MB")
+ assert(Utils.bytesToString(2306867) === "2.2 MB")
+ assert(Utils.bytesToString(5368709120L) === "5.0 GB")
+ assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
}
test("copyStream") {
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
index 5e6d7b09d8..bb5d379273 100644
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
@@ -40,7 +40,7 @@ class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
- val zippedRDD = data1.zipPartitions(ZippedPartitionsSuite.procZippedData, data2, data3)
+ val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
val obtainedSizes = zippedRDD.collect()
val expectedSizes = Array(2, 3, 1, 2, 3, 1)
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index caaf3209fd..3b4a0d52fc 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -59,7 +59,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
override def stop() = {}
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
- taskSet.tasks.foreach(_.generation = mapOutputTracker.getGeneration)
+ taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
taskSets += taskSet
}
override def setListener(listener: TaskSchedulerListener) = {}
@@ -299,10 +299,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
val reduceRdd = makeRdd(2, List(shuffleDep))
submit(reduceRdd, Array(0, 1))
// pretend we were told hostA went away
- val oldGeneration = mapOutputTracker.getGeneration
+ val oldEpoch = mapOutputTracker.getEpoch
runEvent(ExecutorLost("exec-hostA"))
- val newGeneration = mapOutputTracker.getGeneration
- assert(newGeneration > oldGeneration)
+ val newEpoch = mapOutputTracker.getEpoch
+ assert(newEpoch > oldEpoch)
val noAccum = Map[Long, Any]()
val taskSet = taskSets(0)
// should be ignored for being too old
@@ -311,8 +311,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB", 1), noAccum, null, null))
// should be ignored for being too old
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
- // should work because it's a new generation
- taskSet.tasks(1).generation = newGeneration
+ // should work because it's a new epoch
+ taskSet.tasks(1).epoch = newEpoch
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
@@ -401,12 +401,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(results === Map(0 -> 42))
}
- /** Assert that the supplied TaskSet has exactly the given preferredLocations. Note, converts taskSet's locations to host only. */
- private def assertLocations(taskSet: TaskSet, locations: Seq[Seq[String]]) {
- assert(locations.size === taskSet.tasks.size)
- for ((expectLocs, taskLocs) <-
- taskSet.tasks.map(_.preferredLocations).zip(locations)) {
- assert(expectLocs.map(loc => spark.Utils.parseHostPort(loc)._1) === taskLocs)
+ /**
+ * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
+ * Note that this checks only the host and not the executor ID.
+ */
+ private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
+ assert(hosts.size === taskSet.tasks.size)
+ for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
+ assert(taskLocs.map(_.host) === expectedLocs)
}
}
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 05afcd6567..abfdabf5fe 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.scheduler
+package spark.scheduler.cluster
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
@@ -27,7 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import java.util.Properties
-class DummyTaskSetManager(
+class FakeTaskSetManager(
initPriority: Int,
initStageId: Int,
initNumTasks: Int,
@@ -72,10 +72,16 @@ class DummyTaskSetManager(
override def executorLost(executorId: String, host: String): Unit = {
}
- override def slaveOffer(execId: String, host: String, avaiableCpus: Double, overrideLocality: TaskLocality.TaskLocality = null): Option[TaskDescription] = {
+ override def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
+ : Option[TaskDescription] =
+ {
if (tasksFinished + runningTasks < numTasks) {
increaseRunningTasks(1)
- return Some(new TaskDescription(0, execId, "task 0:0", null))
+ return Some(new TaskDescription(0, execId, "task 0:0", 0, null))
}
return None
}
@@ -98,17 +104,10 @@ class DummyTaskSetManager(
}
}
-class DummyTask(stageId: Int) extends Task[Int](stageId)
-{
- def run(attemptId: Long): Int = {
- return 0
- }
-}
-
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
- def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
- new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
+ def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
+ new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
def resourceOffer(rootPool: Pool): Int = {
@@ -118,7 +117,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
- taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
+ taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
@@ -135,7 +134,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
+ val task = new FakeTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
@@ -162,7 +161,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
+ val task = new FakeTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
@@ -219,7 +218,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
- val task = new DummyTask(0)
+ val task = new FakeTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
new file mode 100644
index 0000000000..5a0b949ef5
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler.cluster
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
+
+import org.scalatest.FunSuite
+
+import spark._
+import spark.scheduler._
+import spark.executor.TaskMetrics
+import java.nio.ByteBuffer
+import spark.util.FakeClock
+
+/**
+ * A mock ClusterScheduler implementation that just remembers information about tasks started and
+ * feedback received from the TaskSetManagers. Note that it's important to initialize this with
+ * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
+ * to work, and these are required for locality in ClusterTaskSetManager.
+ */
+class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
+ extends ClusterScheduler(sc)
+{
+ val startedTasks = new ArrayBuffer[Long]
+ val endedTasks = new mutable.HashMap[Long, TaskEndReason]
+ val finishedManagers = new ArrayBuffer[TaskSetManager]
+
+ val executors = new mutable.HashMap[String, String] ++ liveExecutors
+
+ listener = new TaskSchedulerListener {
+ def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ startedTasks += taskInfo.index
+ }
+
+ def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: mutable.Map[Long, Any],
+ taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics)
+ {
+ endedTasks(taskInfo.index) = reason
+ }
+
+ def executorGained(execId: String, host: String) {}
+
+ def executorLost(execId: String) {}
+
+ def taskSetFailed(taskSet: TaskSet, reason: String) {}
+ }
+
+ def removeExecutor(execId: String): Unit = executors -= execId
+
+ override def taskSetFinished(manager: TaskSetManager): Unit = finishedManagers += manager
+
+ override def isExecutorAlive(execId: String): Boolean = executors.contains(execId)
+
+ override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
+}
+
+class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
+ import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
+
+ val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
+
+ test("TaskSet with no preferences") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(1)
+ val manager = new ClusterTaskSetManager(sched, taskSet)
+
+ // Offer a host with no CPUs
+ assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
+
+ // Offer a host with process-local as the constraint; this should work because the TaskSet
+ // above won't have any locality preferences
+ val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === "exec1")
+ assert(sched.startedTasks.contains(0))
+
+ // Re-offer the host -- now we should get no more tasks
+ assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
+
+ // Tell it the task has finished
+ manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+ assert(sched.endedTasks(0) === Success)
+ assert(sched.finishedManagers.contains(manager))
+ }
+
+ test("multiple offers with no preferences") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
+ val taskSet = createTaskSet(3)
+ val manager = new ClusterTaskSetManager(sched, taskSet)
+
+ // First three offers should all find tasks
+ for (i <- 0 until 3) {
+ val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === "exec1")
+ }
+ assert(sched.startedTasks.toSet === Set(0, 1, 2))
+
+ // Re-offer the host -- now we should get no more tasks
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ // Finish the first two tasks
+ manager.statusUpdate(0, TaskState.FINISHED, createTaskResult(0))
+ manager.statusUpdate(1, TaskState.FINISHED, createTaskResult(1))
+ assert(sched.endedTasks(0) === Success)
+ assert(sched.endedTasks(1) === Success)
+ assert(!sched.finishedManagers.contains(manager))
+
+ // Finish the last task
+ manager.statusUpdate(2, TaskState.FINISHED, createTaskResult(2))
+ assert(sched.endedTasks(2) === Success)
+ assert(sched.finishedManagers.contains(manager))
+ }
+
+ test("basic delay scheduling") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = createTaskSet(4,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host2", "exec2")),
+ Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
+ Seq() // Last task has no locality prefs
+ )
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // First offer host1, exec1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
+
+ // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
+
+ // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
+ assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
+
+ // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
+
+ // Offer host1, exec1 again, at ANY level: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1, exec1 again, at ANY level: task 1 should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ }
+
+ test("delay scheduling with fallback") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc,
+ ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+ val taskSet = createTaskSet(5,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host3")),
+ Seq(TaskLocation("host2"))
+ )
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // First offer host1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1 again: nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // Offer host1 again: second task (on host2) should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Offer host1 again: third task (on host2) should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+ // Offer host2: fifth task (also on host2) should get chosen
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
+
+ // Now that we've launched a local task, we should no longer launch the task for host3
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+
+ clock.advance(LOCALITY_WAIT)
+
+ // After another delay, we can go ahead and launch that task non-locally
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
+ }
+
+ test("delay scheduling with failed hosts") {
+ sc = new SparkContext("local", "test")
+ val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+ val taskSet = createTaskSet(3,
+ Seq(TaskLocation("host1")),
+ Seq(TaskLocation("host2")),
+ Seq(TaskLocation("host3"))
+ )
+ val clock = new FakeClock
+ val manager = new ClusterTaskSetManager(sched, taskSet, clock)
+
+ // First offer host1: first task should be chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
+
+ // Offer host1 again: third task should be chosen immediately because host3 is not up
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
+
+ // After this, nothing should get chosen
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+
+ // Now mark host2 as dead
+ sched.removeExecutor("exec2")
+ manager.executorLost("exec2", "host2")
+
+ // Task 1 should immediately be launched on host1 because its original host is gone
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
+
+ // Now that all tasks have launched, nothing new should be launched anywhere else
+ assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
+ assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
+ }
+
+ /**
+ * Utility method to create a TaskSet, potentially setting a particular sequence of preferred
+ * locations for each task (given as varargs) if this sequence is not empty.
+ */
+ def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
+ if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+ throw new IllegalArgumentException("Wrong number of task locations")
+ }
+ val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+ new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
+ }
+ new TaskSet(tasks, 0, 0, 0, null)
+ }
+
+ def createTaskResult(id: Int): ByteBuffer = {
+ ByteBuffer.wrap(Utils.serialize(new TaskResult[Int](id, mutable.Map.empty, new TaskMetrics)))
+ }
+}
diff --git a/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
new file mode 100644
index 0000000000..de9e66be20
--- /dev/null
+++ b/core/src/test/scala/spark/scheduler/cluster/FakeTask.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.scheduler.cluster
+
+import spark.scheduler.{TaskLocation, Task}
+
+class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId) {
+ override def run(attemptId: Long): Int = 0
+
+ override def preferredLocations: Seq[TaskLocation] = prefLocs
+}
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
index a79b8bf256..d28ee47fa3 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.scheduler
+package spark.scheduler.local
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
index 56c1fed6ad..735a794396 100644
--- a/core/src/test/scala/spark/ui/UISuite.scala
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -24,14 +24,15 @@ import org.eclipse.jetty.server.Server
class UISuite extends FunSuite {
test("jetty port increases under contention") {
- val startPort = 33333
+ val startPort = 3030
val server = new Server(startPort)
server.start()
val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
- assert(boundPort1 === startPort + 1)
- assert(boundPort2 === startPort + 2)
+ // Allow some wiggle room in case ports on the machine are under contention
+ assert(boundPort1 > startPort && boundPort1 < startPort + 10)
+ assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
}
test("jetty binds to port 0 correctly") {
diff --git a/core/src/test/scala/spark/util/FakeClock.scala b/core/src/test/scala/spark/util/FakeClock.scala
new file mode 100644
index 0000000000..236706317e
--- /dev/null
+++ b/core/src/test/scala/spark/util/FakeClock.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+class FakeClock extends Clock {
+ private var time = 0L
+
+ def advance(millis: Long): Unit = time += millis
+
+ def getTime(): Long = time
+}
diff --git a/docs/configuration.md b/docs/configuration.md
index 99624a44aa..dff08a06f5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
<td>3000</td>
<td>
Number of milliseconds to wait to launch a data-local task before giving up and launching it
- in a non-data-local location. You should increase this if your tasks are long and you are seeing
- poor data locality, but the default generally works well.
+ on a less-local node. The same wait will be used to step through multiple locality levels
+ (process-local, node-local, rack-local and then any). It is also possible to customize the
+ waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
+ You should increase this setting if your tasks are long and see poor locality, but the
+ default usually works well.
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait.process</td>
+ <td>spark.locality.wait</td>
+ <td>
+ Customize the locality wait for process locality. This affects tasks that attempt to access
+ cached data in a particular executor process.
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait.node</td>
+ <td>spark.locality.wait</td>
+ <td>
+ Customize the locality wait for node locality. For example, you can set this to 0 to skip
+ node locality and search immediately for rack locality (if your cluster has rack information).
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait.rack</td>
+ <td>spark.locality.wait</td>
+ <td>
+ Customize the locality wait for rack locality.
</td>
</tr>
<tr>
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 740ec08542..6127c10ced 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -178,7 +178,7 @@ def launch_cluster(conn, opts, cluster_name):
master_group.authorize('tcp', 50030, 50030, '0.0.0.0/0')
master_group.authorize('tcp', 50070, 50070, '0.0.0.0/0')
master_group.authorize('tcp', 60070, 60070, '0.0.0.0/0')
- master_group.authorize('tcp', 33000, 33010, '0.0.0.0/0')
+ master_group.authorize('tcp', 3030, 3035, '0.0.0.0/0')
if opts.cluster_type == "mesos":
master_group.authorize('tcp', 38090, 38090, '0.0.0.0/0')
if opts.ganglia:
diff --git a/pom.xml b/pom.xml
index e7445319dd..fc0b314070 100644
--- a/pom.xml
+++ b/pom.xml
@@ -256,11 +256,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>net.liftweb</groupId>
- <artifactId>lift-json_2.9.2</artifactId>
- <version>2.5</version>
- </dependency>
- <dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.0</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 282b0cbed5..831bfbed78 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -185,7 +185,6 @@ object SparkBuild extends Build {
"com.typesafe.akka" % "akka-slf4j" % "2.0.5" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"colt" % "colt" % "1.2.0",
- "net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.12.1",
"io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test",
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index c2b49ff37a..2803ce90f3 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -46,6 +46,7 @@ class SparkContext(object):
_next_accum_id = 0
_active_spark_context = None
_lock = Lock()
+ _python_includes = None # zip and egg files that need to be added to PYTHONPATH
def __init__(self, master, jobName, sparkHome=None, pyFiles=None,
environment=None, batchSize=1024):
@@ -103,11 +104,14 @@ class SparkContext(object):
# send.
self._pickled_broadcast_vars = set()
+ SparkFiles._sc = self
+ root_dir = SparkFiles.getRootDirectory()
+ sys.path.append(root_dir)
+
# Deploy any code dependencies specified in the constructor
+ self._python_includes = list()
for path in (pyFiles or []):
self.addPyFile(path)
- SparkFiles._sc = self
- sys.path.append(SparkFiles.getRootDirectory())
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.spark.Utils.getLocalDir()
@@ -257,7 +261,11 @@ class SparkContext(object):
HTTP, HTTPS or FTP URI.
"""
self.addFile(path)
- filename = path.split("/")[-1]
+ (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
+
+ if filename.endswith('.zip') or filename.endswith('.ZIP') or filename.endswith('.egg'):
+ self._python_includes.append(filename)
+ sys.path.append(os.path.join(SparkFiles.getRootDirectory(), filename)) # for tests in local mode
def setCheckpointDir(self, dirName, useExisting=False):
"""
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 51c2cb9806..99f5967a8e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -758,8 +758,10 @@ class PipelinedRDD(RDD):
class_manifest = self._prev_jrdd.classManifest()
env = MapConverter().convert(self.ctx.environment,
self.ctx._gateway._gateway_client)
+ includes = ListConverter().convert(self.ctx._python_includes,
+ self.ctx._gateway._gateway_client)
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
- pipe_command, env, self.preservesPartitioning, self.ctx.pythonExec,
+ pipe_command, env, includes, self.preservesPartitioning, self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator, class_manifest)
self._jrdd_val = python_rdd.asJavaRDD()
return self._jrdd_val
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index f75215a781..29d6a128f6 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -125,6 +125,17 @@ class TestAddFile(PySparkTestCase):
from userlibrary import UserClass
self.assertEqual("Hello World!", UserClass().hello())
+ def test_add_egg_file_locally(self):
+ # To ensure that we're actually testing addPyFile's effects, check that
+ # this fails due to `userlibrary` not being on the Python path:
+ def func():
+ from userlib import UserClass
+ self.assertRaises(ImportError, func)
+ path = os.path.join(SPARK_HOME, "python/test_support/userlib-0.1-py2.7.egg")
+ self.sc.addPyFile(path)
+ from userlib import UserClass
+ self.assertEqual("Hello World from inside a package!", UserClass().hello())
+
class TestIO(PySparkTestCase):
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 75d692beeb..695f6dfb84 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -49,15 +49,26 @@ def main(infile, outfile):
split_index = read_int(infile)
if split_index == -1: # for unit tests
return
+
+ # fetch name of workdir
spark_files_dir = load_pickle(read_with_length(infile))
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
- sys.path.append(spark_files_dir)
+
+ # fetch names and values of broadcast variables
num_broadcast_variables = read_int(infile)
for _ in range(num_broadcast_variables):
bid = read_long(infile)
value = read_with_length(infile)
_broadcastRegistry[bid] = Broadcast(bid, load_pickle(value))
+
+ # fetch names of includes (*.zip and *.egg files) and construct PYTHONPATH
+ sys.path.append(spark_files_dir) # *.py files that were added will be copied here
+ num_python_includes = read_int(infile)
+ for _ in range(num_python_includes):
+ sys.path.append(os.path.join(spark_files_dir, load_pickle(read_with_length(infile))))
+
+ # now load function
func = load_obj(infile)
bypassSerializer = load_obj(infile)
if bypassSerializer:
diff --git a/python/test_support/userlib-0.1-py2.7.egg b/python/test_support/userlib-0.1-py2.7.egg
new file mode 100644
index 0000000000..1674c9cb22
--- /dev/null
+++ b/python/test_support/userlib-0.1-py2.7.egg
Binary files differ
diff --git a/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
index 30fded12f0..f45d0b281c 100644
--- a/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/spark/tools/JavaAPICompletenessChecker.scala
@@ -17,13 +17,15 @@
package spark.tools
-import spark._
import java.lang.reflect.Method
+
import scala.collection.mutable.ArrayBuffer
+
+import spark._
import spark.api.java._
+import spark.rdd.OrderedRDDFunctions
import spark.streaming.{PairDStreamFunctions, DStream, StreamingContext}
import spark.streaming.api.java.{JavaPairDStream, JavaDStream, JavaStreamingContext}
-import scala.Tuple2
private[spark] abstract class SparkType(val name: String)
@@ -336,7 +338,7 @@ object JavaAPICompletenessChecker {
println()
println("Missing OrderedRDD methods")
- printMissingMethods(classOf[OrderedRDDFunctions[_, _]], classOf[JavaPairRDD[_, _]])
+ printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], classOf[JavaPairRDD[_, _]])
println()
println("Missing SparkContext methods")
diff --git a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
index 307d96111c..bb58353e0c 100644
--- a/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -41,13 +41,6 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
if (retval != null) Some(retval) else None
}
- // By default, if rack is unknown, return nothing
- override def getCachedHostsForRack(rack: String): Option[Set[String]] = {
- if (rack == None || rack == null) return None
-
- YarnAllocationHandler.fetchCachedHostsForRack(rack)
- }
-
override def postStartHook() {
val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
if (sparkContextInitialized){