aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-08-14 13:56:40 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-08-18 19:51:07 -0700
commit222c8971285190761354456c2fe07f5c31edf330 (patch)
treec9e7b734bf6aa31619653b55fdb42cdf1f477b17 /core
parentcf39d45d14e0256bbd3ffe206c14997f02429cb3 (diff)
downloadspark-222c8971285190761354456c2fe07f5c31edf330.tar.gz
spark-222c8971285190761354456c2fe07f5c31edf330.tar.bz2
spark-222c8971285190761354456c2fe07f5c31edf330.zip
Comment cleanup (via Kay) and some debug messages
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala30
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala5
-rw-r--r--core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala)2
-rw-r--r--core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala (renamed from core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala)2
4 files changed, 16 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index ec76e90185..028f4d3283 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -385,25 +385,17 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
object ClusterScheduler {
-
- // Used to 'spray' available containers across the available set to ensure too many containers on same host
- // are not used up. Used in yarn mode and in task scheduling (when there are multiple containers available
- // to execute a task)
- // For example: yarn can returns more containers than we would have requested under ANY, this method
- // prioritizes how to use the allocated containers.
- // flatten the map such that the array buffer entries are spread out across the returned value.
- // given <host, list[container]> == <h1, [c1 .. c5]>, <h2, [c1 .. c3]>, <h3, [c1, c2]>, <h4, c1>, <h5, c1>, i
- // the return value would be something like : h1c1, h2c1, h3c1, h4c1, h5c1, h1c2, h2c2, h3c2, h1c3, h2c3, h1c4, h1c5
- // We then 'use' the containers in this order (consuming only the top K from this list where
- // K = number to be user). This is to ensure that if we have multiple eligible allocations,
- // they dont end up allocating all containers on a small number of hosts - increasing probability of
- // multiple container failure when a host goes down.
- // Note, there is bias for keys with higher number of entries in value to be picked first (by design)
- // Also note that invocation of this method is expected to have containers of same 'type'
- // (host-local, rack-local, off-rack) and not across types : so that reordering is simply better from
- // the available list - everything else being same.
- // That is, we we first consume data local, then rack local and finally off rack nodes. So the
- // prioritization from this method applies to within each category
+ /**
+ * Used to balance containers across hosts.
+ *
+ * Accepts a map of hosts to resource offers for that host, and returns a prioritized list of
+ * resource offers representing the order in which the offers should be used. The resource
+ * offers are ordered such that we'll allocate one container on each host before allocating a
+ * second container on any host, and so on, in order to reduce the damage if a host fails.
+ *
+ * For example, given <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, returns
+ * [o1, o5, o4, 02, o6, o3]
+ */
def prioritizeContainers[K, T] (map: HashMap[K, ArrayBuffer[T]]): List[T] = {
val _keyList = new ArrayBuffer[K](map.size)
_keyList ++= map.keys
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index cf406f876f..5316a7aed1 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -116,7 +116,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
// Figure out the current map output tracker epoch and set it on all tasks
val epoch = sched.mapOutputTracker.getEpoch
- logDebug("Epoch for " + taskSet.id + ": " + epoch)
+ logDebug("Epoch for " + taskSet + ": " + epoch)
for (t <- tasks) {
t.epoch = epoch
}
@@ -129,7 +129,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
val myLocalityLevels = computeValidLocalityLevels()
- val localityWaits = myLocalityLevels.map(getLocalityWait) // spark.locality.wait
+ val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
// Delay scheduling variables: we keep track of our current locality level and the time we
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
@@ -687,6 +687,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
levels += RACK_LOCAL
}
levels += ANY
+ logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}
}
diff --git a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 6327155157..8618009ea6 100644
--- a/core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.scheduler
+package spark.scheduler.cluster
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
diff --git a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
index a79b8bf256..d28ee47fa3 100644
--- a/core/src/test/scala/spark/scheduler/LocalSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/local/LocalSchedulerSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package spark.scheduler
+package spark.scheduler.local
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter