aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-22 15:26:55 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-22 15:27:41 -0800
commitf33662c133d628d51718dec070a9096888330c58 (patch)
tree14e9c66ab4674ebb4f68ab1c0d19ae1e0369b75b /core
parent7341de0d48a4f9d11e13e3cd47c8fb2c3ab9d23e (diff)
parent2ed791fd7fa193ea7e10d70e1c1b0787d584b0fd (diff)
downloadspark-f33662c133d628d51718dec070a9096888330c58.tar.gz
spark-f33662c133d628d51718dec070a9096888330c58.tar.bz2
spark-f33662c133d628d51718dec070a9096888330c58.zip
Merge remote-tracking branch 'pwendell/starvation-check'
Also fixed a bug where master was offering executors on dead workers Conflicts: core/src/main/scala/spark/deploy/master/Master.scala
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala13
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala22
2 files changed, 32 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 1cd68a2aa6..dda25463c7 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -33,6 +33,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingApps = new ArrayBuffer[ApplicationInfo]
val completedApps = new ArrayBuffer[ApplicationInfo]
+ var firstApp: Option[ApplicationInfo] = None
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
@@ -167,7 +169,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ .filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -190,7 +192,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
- for (worker <- workers if worker.coresFree > 0) {
+ for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
@@ -203,6 +205,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
}
+ if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 &&
+ firstApp != None && firstApp.get.executors.size == 0) {
+ logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.")
+ }
}
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
@@ -245,6 +251,9 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
idToApp(app.id) = app
actorToApp(driver) = app
addressToApp(driver.path.address) = app
+ if (firstApp == None) {
+ firstApp = Some(app)
+ }
return app
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 1e4fbdb874..04d01e9ce8 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -11,6 +11,7 @@ import spark.TaskState.TaskState
import spark.scheduler._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import java.util.{TimerTask, Timer}
/**
* The main TaskScheduler implementation, for running tasks on a cluster. Clients should first call
@@ -22,6 +23,8 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ // Threshold above which we warn user initial TaskSet may be starved
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -30,6 +33,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
val taskIdToExecutorId = new HashMap[Long, String]
val taskSetTaskIds = new HashMap[String, HashSet[Long]]
+ var hasReceivedTask = false
+ var hasLaunchedTask = false
+ val starvationTimer = new Timer(true)
+
// Incrementing Mesos task IDs
val nextTaskId = new AtomicLong(0)
@@ -94,6 +101,18 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
activeTaskSets(taskSet.id) = manager
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id) = new HashSet[Long]()
+
+ if (hasReceivedTask == false) {
+ starvationTimer.scheduleAtFixedRate(new TimerTask() {
+ override def run() {
+ if (!hasLaunchedTask) {
+ logWarning("Initial TaskSet has not accepted any offers. " +
+ "Check the scheduler UI to ensure slaves are registered.")
+ }
+ }
+ }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
+ }
+ hasReceivedTask = true;
}
backend.reviveOffers()
}
@@ -150,6 +169,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
+ if (tasks.size > 0) hasLaunchedTask = true
return tasks
}
}
@@ -235,7 +255,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
override def defaultParallelism() = backend.defaultParallelism()
-
+
// Check for speculatable tasks in all our active jobs.
def checkSpeculatableTasks() {
var shouldRevive = false