From d4d7993bf5106545ae1056fb6e8d7e2601f60535 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 22 Feb 2013 15:51:37 -0800 Subject: Several fixes to the work to log when no resources can be used by a job. Fixed some of the messages as well as code style. --- core/src/main/scala/spark/deploy/master/Master.scala | 8 ++++---- .../scala/spark/scheduler/cluster/ClusterScheduler.scala | 12 ++++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index dda25463c7..b7f167425f 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -205,10 +205,6 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor } } } - if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 && - firstApp != None && firstApp.get.executors.size == 0) { - logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.") - } } def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { @@ -254,6 +250,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor if (firstApp == None) { firstApp = Some(app) } + val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray + if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) { + logWarning("Could not find any workers with enough memory for " + firstApp.get.id) + } return app } diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala index 04d01e9ce8..d9c2f9517b 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala @@ -24,7 +24,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext) // How often to check for speculative tasks val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong + val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong val activeTaskSets = new HashMap[String, TaskSetManager] var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager] @@ -106,8 +106,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext) starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { - logWarning("Initial TaskSet has not accepted any offers. " + - "Check the scheduler UI to ensure slaves are registered.") + logWarning("Initial job has not accepted any resources; " + + "check your cluster UI to ensure that workers are registered") + } else { + this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) @@ -169,7 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext) } } while (launchedTask) } - if (tasks.size > 0) hasLaunchedTask = true + if (tasks.size > 0) { + hasLaunchedTask = true + } return tasks } } -- cgit v1.2.3