From 667860448ad5f705dd7548263cf7f240def25d87 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 2 Feb 2013 22:21:36 -0800 Subject: Starvation check in ClusterScheduler --- .../spark/scheduler/cluster/ClusterScheduler.scala | 33 +++++++++++++++++++++- .../spark/scheduler/cluster/TaskSetManager.scala | 9 ++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 1e4fbdb874..aed9826377 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -22,6 +22,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong + // How often to check for starved TaskSets + val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -84,6 +86,21 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } }.start() } + + new Thread("ClusterScheduler starvation check") { + setDaemon(true) + + override def run() { + while (true) { + try { + Thread.sleep(STARVATION_CHECK_INTERVAL) + } catch { + case e: InterruptedException => {} + } + detectStarvedTaskSets() + } + } + }.start() } override def submitTasks(taskSet: TaskSet) { @@ -235,7 +252,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } override def defaultParallelism() = backend.defaultParallelism() - + // Check for speculatable tasks in all our active jobs. def checkSpeculatableTasks() { var shouldRevive = false @@ -249,6 +266,20 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } + // Find and resource-starved TaskSets and alert the user + def detectStarvedTaskSets() { + val noOfferThresholdSeconds = 5 + synchronized { + for (ts <- activeTaskSetsQueue) { + if (ts == TaskSetManager.firstTaskSet.get && + (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) && + ts.receivedOffers == 0) { + logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.") + } + } + } + } + def executorLost(executorId: String, reason: ExecutorLossReason) { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 3dabdd76b1..58c5d4553e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -43,6 +43,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 + val creationTime = System.currentTimeMillis + var receivedOffers = 0 // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -96,6 +98,8 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } + if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this) + // Add a task to all the pending-task lists that it should be on. private def addPendingTask(index: Int) { val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive @@ -188,6 +192,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Respond to an offer of a single slave from the scheduler by finding a task def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { + receivedOffers += 1 if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -427,3 +432,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return foundTasks } } + +object TaskSetManager { + var firstTaskSet: Option[TaskSetManager] = None +} -- cgit v1.2.3 From b14322956cbf268b0c880f17188af24ba4884d5b Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 3 Feb 2013 12:17:20 -0800 Subject: Starvation check in Standlone scheduler --- core/src/main/scala/spark/deploy/master/Master.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index c618e87cdd..8513dcefa0 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -31,6 +31,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor val waitingJobs = new ArrayBuffer[JobInfo] val completedJobs = new ArrayBuffer[JobInfo] + var firstJob: Option[JobInfo] = None + val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else ip @@ -191,6 +193,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } } + if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 && + firstJob.isDefined && + firstJob.get.executors.size == 0) { + logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.") + } } def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { @@ -232,6 +239,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor idToJob(job.id) = job actorToJob(driver) = job addressToJob(driver.path.address) = job + if (!firstJob.isDefined) firstJob = Some(job) return job } -- cgit v1.2.3 From 1859c9f93c409a355b404d24b5632b3822ad42c1 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 9 Feb 2013 21:55:17 -0800 Subject: Changing to use Timer based on code review --- .../spark/scheduler/cluster/ClusterScheduler.scala | 51 +++++++++------------- .../spark/scheduler/cluster/TaskSetManager.scala | 8 ---- 2 files changed, 20 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index aed9826377..04d01e9ce8 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -11,6 +11,7 @@ import spark.TaskState.TaskState import spark.scheduler._ import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicLong +import java.util.{TimerTask, Timer} /** * The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call @@ -22,8 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong - // How often to check for starved TaskSets - val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong + // Threshold above which we warn user initial TaskSet may be starved + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -32,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) val taskIdToExecutorId = new HashMap[Long, String] val taskSetTaskIds = new HashMap[String, HashSet[Long]] + var hasReceivedTask = false + var hasLaunchedTask = false + val starvationTimer = new Timer(true) + // Incrementing Mesos task IDs val nextTaskId = new AtomicLong(0) @@ -86,21 +91,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } }.start() } - - new Thread("ClusterScheduler starvation check") { - setDaemon(true) - - override def run() { - while (true) { - try { - Thread.sleep(STARVATION_CHECK_INTERVAL) - } catch { - case e: InterruptedException => {} - } - detectStarvedTaskSets() - } - } - }.start() } override def submitTasks(taskSet: TaskSet) { @@ -111,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext) activeTaskSets(taskSet.id) = manager activeTaskSetsQueue += manager taskSetTaskIds(taskSet.id) = new HashSet[Long]() + + if (hasReceivedTask == false) { + starvationTimer.scheduleAtFixedRate(new TimerTask() { + override def run() { + if (!hasLaunchedTask) { + logWarning("Initial TaskSet has not accepted any offers. " + + "Check the scheduler UI to ensure slaves are registered.") + } + } + }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) + } + hasReceivedTask = true; } backend.reviveOffers() } @@ -167,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } + if (tasks.size > 0) hasLaunchedTask = true return tasks } } @@ -266,20 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } - // Find and resource-starved TaskSets and alert the user - def detectStarvedTaskSets() { - val noOfferThresholdSeconds = 5 - synchronized { - for (ts <- activeTaskSetsQueue) { - if (ts == TaskSetManager.firstTaskSet.get && - (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) && - ts.receivedOffers == 0) { - logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.") - } - } - } - } - def executorLost(executorId: String, reason: ExecutorLossReason) { var failedExecutor: Option[String] = None synchronized { diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 58c5d4553e..584cfdff45 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -44,7 +44,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 val creationTime = System.currentTimeMillis - var receivedOffers = 0 // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis @@ -98,8 +97,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe addPendingTask(i) } - if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this) - // Add a task to all the pending-task lists that it should be on. private def addPendingTask(index: Int) { val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive @@ -192,7 +189,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe // Respond to an offer of a single slave from the scheduler by finding a task def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = { - receivedOffers += 1 if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) { val time = System.currentTimeMillis val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT) @@ -432,7 +428,3 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe return foundTasks } } - -object TaskSetManager { - var firstTaskSet: Option[TaskSetManager] = None -} -- cgit v1.2.3 From 2ed791fd7fa193ea7e10d70e1c1b0787d584b0fd Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sat, 9 Feb 2013 21:59:01 -0800 Subject: Minor fixes --- core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 584cfdff45..3dabdd76b1 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -43,7 +43,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe val numFailures = new Array[Int](numTasks) val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil) var tasksFinished = 0 - val creationTime = System.currentTimeMillis // Last time when we launched a preferred task (for delay scheduling) var lastPreferredLaunchTime = System.currentTimeMillis -- cgit v1.2.3