aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-02-03 12:17:20 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-02-03 12:45:10 -0800
commitb14322956cbf268b0c880f17188af24ba4884d5b (patch)
treea49fa3aadb1490920d18c7110210bb8a412725c8 /core/src
parent667860448ad5f705dd7548263cf7f240def25d87 (diff)
downloadspark-b14322956cbf268b0c880f17188af24ba4884d5b.tar.gz
spark-b14322956cbf268b0c880f17188af24ba4884d5b.tar.bz2
spark-b14322956cbf268b0c880f17188af24ba4884d5b.zip
Starvation check in Standlone scheduler
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala8
1 files changed, 8 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index c618e87cdd..8513dcefa0 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -31,6 +31,8 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
val waitingJobs = new ArrayBuffer[JobInfo]
val completedJobs = new ArrayBuffer[JobInfo]
+ var firstJob: Option[JobInfo] = None
+
val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else ip
@@ -191,6 +193,11 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
}
}
}
+ if (workers.toArray.filter(_.state == WorkerState.ALIVE).size > 0 &&
+ firstJob.isDefined &&
+ firstJob.get.executors.size == 0) {
+ logWarning("Could not find any machines with enough memory. Ensure that SPARK_WORKER_MEM > SPARK_MEM.")
+ }
}
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
@@ -232,6 +239,7 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
idToJob(job.id) = job
actorToJob(driver) = job
addressToJob(driver.path.address) = job
+ if (!firstJob.isDefined) firstJob = Some(job)
return job
}