diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-02-09 21:55:17 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-02-09 21:55:17 -0800 |
commit | 1859c9f93c409a355b404d24b5632b3822ad42c1 (patch) | |
tree | 5d48472668890f8da75a7b2385ab43b97c8ccb80 | |
parent | b14322956cbf268b0c880f17188af24ba4884d5b (diff) | |
download | spark-1859c9f93c409a355b404d24b5632b3822ad42c1.tar.gz spark-1859c9f93c409a355b404d24b5632b3822ad42c1.tar.bz2 spark-1859c9f93c409a355b404d24b5632b3822ad42c1.zip |
Changing to use Timer based on code review
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala | 51 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 8 |
2 files changed, 20 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index aed9826377..04d01e9ce8 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -11,6 +11,7 @@ import spark.TaskState.TaskState import spark.scheduler._ import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -22,8 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong - // How often to check for starved TaskSets - val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong + // Threshold above which we warn user initial TaskSet may be starved + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -32,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] + var hasReceivedTask = false + var hasLaunchedTask = false + val starvationTimer = new Timer(true) + // Incrementing Mesos task IDs val nextTaskId = new AtomicLong(0) @@ -86,21 +91,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } }.start() } - - new Thread("ClusterScheduler starvation check") { - setDaemon(true) - - override def run() { - while (true) { - try { - Thread.sleep(STARVATION_CHECK_INTERVAL) - } catch { - case e: InterruptedException => {} - } - detectStarvedTaskSets() - } - } - }.start() } override def submitTasks(taskSet: TaskSet) { @@ -111,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext) activeTaskSets(taskSet.id) = manager activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet[Long]() + + if (hasReceivedTask == false) { + starvationTimer.scheduleAtFixedRate(new TimerTask() { + override def run() { + if (!hasLaunchedTask) { + logWarning("Initial TaskSet has not accepted any offers. " + + "Check the scheduler UI to ensure slaves are registered.") + } + } + }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) + } + hasReceivedTask = true; } backend.reviveOffers() } @@ -167,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } + if (tasks.size > 0) hasLaunchedTask = true return tasks } } @@ -266,20 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - // Find and resource-starved TaskSets and alert the user - def detectStarvedTaskSets() { - val noOfferThresholdSeconds = 5 - synchronized { - for (ts <- activeTaskSetsQueue) { - if (ts == TaskSetManager.firstTaskSet.get && - (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) && - ts.receivedOffers == 0) { - logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.") - } - } - } - } - def executorLost(executorId: String, reason: ExecutorLossReason) { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 58c5d4553e..584cfdff45 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -44,7 +44,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 val creationTime = System.currentTimeMillis - var receivedOffers = 0 // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -98,8 +97,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } - if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this) - // Add a task to all the pending-task lists that it should be on. private def addPendingTask(index: Int) { val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive @@ -192,7 +189,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Respond to an offer of a single slave from the scheduler by finding a task def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { - receivedOffers += 1 if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -432,7 +428,3 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return foundTasks } } - -object TaskSetManager { - var firstTaskSet: Option[TaskSetManager] = None -} |