diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-02 22:21:36 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-03 12:45:04 -0800 |
commit | 667860448ad5f705dd7548263cf7f240def25d87 (patch) | |
tree | 2c40efbe766c4ed97a330c14e7536427423178c6 /core/src | |
parent | 34a7bcdb3a19deed18b25225daf47ff22ee20869 (diff) | |
download | spark-667860448ad5f705dd7548263cf7f240def25d87.tar.gz spark-667860448ad5f705dd7548263cf7f240def25d87.tar.bz2 spark-667860448ad5f705dd7548263cf7f240def25d87.zip |
Starvation check in ClusterScheduler
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala | 33 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 9 |
2 files changed, 41 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 1e4fbdb874..aed9826377 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -22,6 +22,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong + // How often to check for starved TaskSets + val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -84,6 +86,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } }.start() } + + new Thread("ClusterScheduler starvation check") { + setDaemon(true) + + override def run() { + while (true) { + try { + Thread.sleep(STARVATION_CHECK_INTERVAL) + } catch { + case e: InterruptedException => {} + } + detectStarvedTaskSets() + } + } + }.start() } override def submitTasks(taskSet: TaskSet) { @@ -235,7 +252,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } override def defaultParallelism() = backend.defaultParallelism() - + // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false @@ -249,6 +266,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + // Find and resource-starved TaskSets and alert the user + def detectStarvedTaskSets() { + val noOfferThresholdSeconds = 5 + synchronized { + for (ts <- activeTaskSetsQueue) { + if (ts == TaskSetManager.firstTaskSet.get && + (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) && + ts.receivedOffers == 0) { + logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.") + } + } + } + } + def executorLost(executorId: String, reason: ExecutorLossReason) { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 3dabdd76b1..58c5d4553e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -43,6 +43,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 + val creationTime = System.currentTimeMillis + var receivedOffers = 0 // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -96,6 +98,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } + if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this) + // Add a task to all the pending-task lists that it should be on. private def addPendingTask(index: Int) { val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive @@ -188,6 +192,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Respond to an offer of a single slave from the scheduler by finding a task def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { + receivedOffers += 1 if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -427,3 +432,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return foundTasks } } + +object TaskSetManager { + var firstTaskSet: Option[TaskSetManager] = None +} |