aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-22 15:51:37 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-22 15:51:37 -0800
commitd4d7993bf5106545ae1056fb6e8d7e2601f60535 (patch)
tree0fc3483585e26c19349d1ea7fc6aa457a9441238 /core
parentf33662c133d628d51718dec070a9096888330c58 (diff)
downloadspark-d4d7993bf5106545ae1056fb6e8d7e2601f60535.tar.gz
spark-d4d7993bf5106545ae1056fb6e8d7e2601f60535.tar.bz2
spark-d4d7993bf5106545ae1056fb6e8d7e2601f60535.zip
Several fixes to the work to log when no resources can be used by a job.
Fixed some of the messages as well as code style.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala8
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala12
2 files changed, 12 insertions, 8 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index dda25463c7..b7f167425f 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -205,10 +205,6 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
}
- if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 &&
- firstApp != None && firstApp.get.executors.size == 0) {
- logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.")
- }
}
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
@@ -254,6 +250,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
if (firstApp == None) {
firstApp = Some(app)
}
+ val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
+ if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+ logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
+ }
return app
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 04d01e9ce8..d9c2f9517b 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -24,7 +24,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// How often to check for speculative tasks
val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "5000").toLong
+ val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
val activeTaskSets = new HashMap[String, TaskSetManager]
var activeTaskSetsQueue = new ArrayBuffer[TaskSetManager]
@@ -106,8 +106,10 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
- logWarning("Initial TaskSet has not accepted any offers. " +
- "Check the scheduler UI to ensure slaves are registered.")
+ logWarning("Initial job has not accepted any resources; " +
+ "check your cluster UI to ensure that workers are registered")
+ } else {
+ this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
@@ -169,7 +171,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
- if (tasks.size > 0) hasLaunchedTask = true
+ if (tasks.size > 0) {
+ hasLaunchedTask = true
+ }
return tasks
}
}