aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-02 23:58:23 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-02 23:58:23 -0800
commit3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7 (patch)
treea188a44a9b12f8c52c8c3ec27445b3e04b767b19
parent88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e (diff)
parentcae8a6795c7f454b74c8d3c4425a6ced151d6d9b (diff)
downloadspark-3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7.tar.gz
spark-3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7.tar.bz2
spark-3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7.zip
Merge pull request #379 from stephenh/sparkmem
Add spark.executor.memory to differentiate executor memory from spark-shell
-rw-r--r--core/src/main/scala/spark/SparkContext.scala3
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala14
6 files changed, 17 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index b95049da0c..0efc00d5dd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -108,8 +108,9 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
+ // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
- "SPARK_TESTING")) {
+ "SPARK_TESTING")) {
val value = System.getenv(key)
if (value != null) {
executorEnvs(key) = value
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index f5ff267d44..4ef637090c 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -113,8 +113,7 @@ private[spark] class ExecutorRunner(
for ((key, value) <- jobDesc.command.environment) {
env.put(key, value)
}
- env.put("SPARK_CORES", cores.toString)
- env.put("SPARK_MEMORY", memory.toString)
+ env.put("SPARK_MEM", memory.toString + "m")
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
index ddcd64d7c6..9ac875de3a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
@@ -1,5 +1,7 @@
package spark.scheduler.cluster
+import spark.Utils
+
/**
* A backend interface for cluster scheduling systems that allows plugging in different ones under
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
@@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend {
def reviveOffers(): Unit
def defaultParallelism(): Int
+ // Memory used by each executor (in megabytes)
+ protected val executorMemory = {
+ // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+ Option(System.getProperty("spark.executor.memory"))
+ .orElse(Option(System.getenv("SPARK_MEM")))
+ .map(Utils.memoryStringToMb)
+ .getOrElse(512)
+ }
+
+
// TODO: Probably want to add a killTask too
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9760d23072..59ff8bcb90 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -20,16 +20,6 @@ private[spark] class SparkDeploySchedulerBackend(
val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
- // Memory used by each executor (in megabytes)
- val executorMemory = {
- if (System.getenv("SPARK_MEM") != null) {
- Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- } else {
- 512
- }
- }
-
override def start() {
super.start()
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 7bf56a05d6..b481ec0a72 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend(
val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
- // Memory used by each executor (in megabytes)
- val executorMemory = {
- if (System.getenv("SPARK_MEM") != null) {
- Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- } else {
- 512
- }
- }
-
// Lock used to wait for scheduler to be registered
var isRegistered = false
val registeredLock = new Object()
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index eab1c60e0b..300766d0f5 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with Logging {
- // Memory used by each executor (in megabytes)
- val EXECUTOR_MEMORY = {
- if (System.getenv("SPARK_MEM") != null) {
- Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- } else {
- 512
- }
- }
-
// Lock used to wait for scheduler to be registered
var isRegistered = false
val registeredLock = new Object()
@@ -89,7 +79,7 @@ private[spark] class MesosSchedulerBackend(
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build())
+ .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
.build()
val command = CommandInfo.newBuilder()
.setValue(execScript)
@@ -161,7 +151,7 @@ private[spark] class MesosSchedulerBackend(
def enoughMemory(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
val slaveId = o.getSlaveId.getValue
- mem >= EXECUTOR_MEMORY || slaveIdsWithExecutors.contains(slaveId)
+ mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
}
for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {