aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-09 21:55:17 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-09 21:55:17 -0800
commit1859c9f93c409a355b404d24b5632b3822ad42c1 (patch)
tree5d48472668890f8da75a7b2385ab43b97c8ccb80
parentb14322956cbf268b0c880f17188af24ba4884d5b (diff)
downloadspark-1859c9f93c409a355b404d24b5632b3822ad42c1.tar.gz
spark-1859c9f93c409a355b404d24b5632b3822ad42c1.tar.bz2
spark-1859c9f93c409a355b404d24b5632b3822ad42c1.zip
Changing to use Timer based on code review
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala51
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala8
2 files changed, 20 insertions, 39 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index aed9826377..04d01e9ce8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
import spark.scheduler._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -22,8 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
- // How often to check for starved TaskSets
- val STARVATION_CHECK_INTERVAL = System.getProperty("spark.starvation_check.interval", "5000").toLong
+ // Threshold above which we warn user initial TaskSet may be starved
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -32,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+ var hasReceivedTask = false
+ var hasLaunchedTask = false
+ val starvationTimer = new Timer(true)
+
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
@@ -86,21 +91,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}.start()
}
-
- new Thread("ClusterScheduler starvation check") {
- setDaemon(true)
-
- override def run() {
- while (true) {
- try {
- Thread.sleep(STARVATION_CHECK_INTERVAL)
- } catch {
- case e: InterruptedException => {}
- }
- detectStarvedTaskSets()
- }
- }
- }.start()
}
override def submitTasks(taskSet: TaskSet) {
@@ -111,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+ if (hasReceivedTask == false) {
+ starvationTimer.scheduleAtFixedRate(new TimerTask() {
+ override def run() {
+ if (!hasLaunchedTask) {
+ logWarning("Initial TaskSet has not accepted any offers. " +
+ "Check the scheduler UI to ensure slaves are registered.")
+ }
+ }
+ }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }
+ hasReceivedTask = true;
}
backend.reviveOffers()
}
@@ -167,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
+ if (tasks.size > 0) hasLaunchedTask = true
return tasks
}
}
@@ -266,20 +269,6 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- // Find and resource-starved TaskSets and alert the user
- def detectStarvedTaskSets() {
- val noOfferThresholdSeconds = 5
- synchronized {
- for (ts <- activeTaskSetsQueue) {
- if (ts == TaskSetManager.firstTaskSet.get &&
- (System.currentTimeMillis - ts.creationTime > noOfferThresholdSeconds * 1000) &&
- ts.receivedOffers == 0) {
- logWarning("No offers received. Check the scheduler UI to ensure slaves are registered.")
- }
- }
- }
- }
-
def executorLost(executorId: String, reason: ExecutorLossReason) {
var failedExecutor: Option[String] = None
synchronized {
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 58c5d4553e..584cfdff45 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -44,7 +44,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksFinished = 0
val creationTime = System.currentTimeMillis
- var receivedOffers = 0
// Last time when we launched a preferred task (for delay scheduling)
var lastPreferredLaunchTime = System.currentTimeMillis
@@ -98,8 +97,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
addPendingTask(i)
}
- if (!TaskSetManager.firstTaskSet.isDefined) TaskSetManager.firstTaskSet = Some(this)
-
// Add a task to all the pending-task lists that it should be on.
private def addPendingTask(index: Int) {
val locations = tasks(index).preferredLocations.toSet & sched.hostsAlive
@@ -192,7 +189,6 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
// Respond to an offer of a single slave from the scheduler by finding a task
def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
- receivedOffers += 1
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
@@ -432,7 +428,3 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return foundTasks
}
}
-
-object TaskSetManager {
- var firstTaskSet: Option[TaskSetManager] = None
-}