diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-22 15:51:37 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-22 15:51:37 -0800 |
commit | d4d7993bf5106545ae1056fb6e8d7e2601f60535 (patch) | |
tree | 0fc3483585e26c19349d1ea7fc6aa457a9441238 | |
parent | f33662c133d628d51718dec070a9096888330c58 (diff) | |
download | spark-d4d7993bf5106545ae1056fb6e8d7e2601f60535.tar.gz spark-d4d7993bf5106545ae1056fb6e8d7e2601f60535.tar.bz2 spark-d4d7993bf5106545ae1056fb6e8d7e2601f60535.zip |
Several fixes to the work to log when no resources can be used by a job.
Fixed some of the messages as well as code style.
-rw-r--r-- | core/src/main/scala/spark/deploy/master/Master.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala | 12 |
2 files changed, 12 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index dda25463c7..b7f167425f 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -205,10 +205,6 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } } - if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 && - firstApp != None && firstApp.get.executors.size == 0) { - logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.") - } } def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { @@ -254,6 +250,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor if (firstApp == None) { firstApp = Some(app) } + val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray + if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) { + logWarning("Could not find any workers with enough memory for " + firstApp.get.id) + } return app } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 04d01e9ce8..d9c2f9517b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -24,7 +24,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -106,8 +106,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { - logWarning("Initial TaskSet has not accepted any offers. " + - "Check the scheduler UI to ensure slaves are registered.") + logWarning("Initial job has not accepted any resources; " + + "check your cluster UI to ensure that workers are registered") + } else { + this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) @@ -169,7 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } - if (tasks.size > 0) hasLaunchedTask = true + if (tasks.size > 0) { + hasLaunchedTask = true + } return tasks } } |