aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-02 22:21:36 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-03 12:45:04 -0800
commit667860448ad5f705dd7548263cf7f240def25d87 (patch)
tree2c40efbe766c4ed97a330c14e7536427423178c6 /core/src
parent34a7bcdb3a19deed18b25225daf47ff22ee20869 (diff)
downloadspark-667860448ad5f705dd7548263cf7f240def25d87.tar.gz
spark-667860448ad5f705dd7548263cf7f240def25d87.tar.bz2
spark-667860448ad5f705dd7548263cf7f240def25d87.zip
Starvation check in ClusterScheduler
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala33
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala9
2 files changed, 41 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1e4fbdb874..aed9826377 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -22,6 +22,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ // How often to check for starved TaskSets
+ val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -84,6 +86,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}.start()
}
+
+ new Thread("ClusterScheduler starvation check") {
+ setDaemon(true)
+
+ override def run() {
+ while (true) {
+ try {
+ Thread.sleep(STARVATION_CHECK_INTERVAL)
+ } catch {
+ case e: InterruptedException => {}
+ }
+ detectStarvedTaskSets()
+ }
+ }
+ }.start()
}
override def submitTasks(taskSet: TaskSet) {
@@ -235,7 +252,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
override def defaultParallelism() = backend.defaultParallelism()
-
+
// Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
var shouldRevive = false
@@ -249,6 +266,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
+ // Find and resource-starved TaskSets and alert the user
+ def detectStarvedTaskSets() {
+ val noOfferThresholdSeconds = 5
+ synchronized {
+ for (ts <- activeTaskSetsQueue) {
+ if (ts == TaskSetManager.firstTaskSet.get &&
+ (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) &&
+ ts.receivedOffers == 0) {
+ logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.")
+ }
+ }
+ }
+ }
+
def executorLost(executorId: String, reason: ExecutorLossReason) {
var failedExecutor: Option[String] = None
synchronized {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 3dabdd76b1..58c5d4553e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -43,6 +43,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val numFailures = new Array[Int](numTasks)
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
+ val creationTime = System.currentTimeMillis
+ var receivedOffers = 0
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -96,6 +98,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i)
}
+ if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this)
+
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
@@ -188,6 +192,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
+ receivedOffers += 1
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -427,3 +432,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return foundTasks
}
}
+
+object TaskSetManager {
+ var firstTaskSet: Option[TaskSetManager] = None
+}