diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-02 23:58:23 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-02 23:58:23 -0800 |
commit | 3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7 (patch) | |
tree | a188a44a9b12f8c52c8c3ec27445b3e04b767b19 | |
parent | 88ee6163a1b5d6fccfbd6c7e9ae6fd4968b0695e (diff) | |
parent | cae8a6795c7f454b74c8d3c4425a6ced151d6d9b (diff) | |
download | spark-3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7.tar.gz spark-3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7.tar.bz2 spark-3bfaf3ab1d14a85a749f87f1bcd37e553e8440e7.zip |
Merge pull request #379 from stephenh/sparkmem
Add spark.executor.memory to differentiate executor memory from spark-shell
6 files changed, 17 insertions, 35 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index b95049da0c..0efc00d5dd 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -108,8 +108,9 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() + // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", - "SPARK_TESTING")) { + "SPARK_TESTING")) { val value = System.getenv(key) if (value != null) { executorEnvs(key) = value diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index f5ff267d44..4ef637090c 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -113,8 +113,7 @@ private[spark] class ExecutorRunner( for ((key, value) <- jobDesc.command.environment) { env.put(key, value) } - env.put("SPARK_CORES", cores.toString) - env.put("SPARK_MEMORY", memory.toString) + env.put("SPARK_MEM", memory.toString + "m") // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala index ddcd64d7c6..9ac875de3a 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala @@ -1,5 +1,7 @@ package spark.scheduler.cluster +import spark.Utils + /** * A backend interface for cluster scheduling systems that allows plugging in different ones under * ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as @@ -11,5 +13,15 @@ private[spark] trait SchedulerBackend { def reviveOffers(): Unit def defaultParallelism(): Int + // Memory used by each executor (in megabytes) + protected val executorMemory = { + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + Option(System.getProperty("spark.executor.memory")) + .orElse(Option(System.getenv("SPARK_MEM"))) + .map(Utils.memoryStringToMb) + .getOrElse(512) + } + + // TODO: Probably want to add a killTask too } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9760d23072..59ff8bcb90 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -20,16 +20,6 @@ private[spark] class SparkDeploySchedulerBackend( val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt - // Memory used by each executor (in megabytes) - val executorMemory = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } - } - override def start() { super.start() diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 7bf56a05d6..b481ec0a72 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -35,16 +35,6 @@ private[spark] class CoarseMesosSchedulerBackend( val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures - // Memory used by each executor (in megabytes) - val executorMemory = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } - } - // Lock used to wait for scheduler to be registered var isRegistered = false val registeredLock = new Object() diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index eab1c60e0b..300766d0f5 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -29,16 +29,6 @@ private[spark] class MesosSchedulerBackend( with MScheduler with Logging { - // Memory used by each executor (in megabytes) - val EXECUTOR_MEMORY = { - if (System.getenv("SPARK_MEM") != null) { - Utils.memoryStringToMb(System.getenv("SPARK_MEM")) - // TODO: Might need to add some extra memory for the non-heap parts of the JVM - } else { - 512 - } - } - // Lock used to wait for scheduler to be registered var isRegistered = false val registeredLock = new Object() @@ -89,7 +79,7 @@ private[spark] class MesosSchedulerBackend( val memory = Resource.newBuilder() .setName("mem") .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(EXECUTOR_MEMORY).build()) + .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build()) .build() val command = CommandInfo.newBuilder() .setValue(execScript) @@ -161,7 +151,7 @@ private[spark] class MesosSchedulerBackend( def enoughMemory(o: Offer) = { val mem = getResource(o.getResourcesList, "mem") val slaveId = o.getSlaveId.getValue - mem >= EXECUTOR_MEMORY || slaveIdsWithExecutors.contains(slaveId) + mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId) } for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) { |