aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-30 15:38:58 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-30 15:46:46 -0700
commit03d0b858c807339b4221bedffa29ac76eef5352e (patch)
tree3235e3d155dfc6eb0b55a36046492f653ab41346 /core/src
parentccfe953a4db25c920157554a2cd820f8afb41ca3 (diff)
downloadspark-03d0b858c807339b4221bedffa29ac76eef5352e.tar.gz
spark-03d0b858c807339b4221bedffa29ac76eef5352e.tar.bz2
spark-03d0b858c807339b4221bedffa29ac76eef5352e.zip
Made use of spark.executor.memory setting consistent and documented it
Conflicts: core/src/main/scala/spark/SparkContext.scala
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala11
2 files changed, 18 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 70a9d7698c..366afb2a2a 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -115,13 +115,14 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
- for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
- "SPARK_TESTING")) {
+ for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
val value = System.getenv(key)
if (value != null) {
executorEnvs(key) = value
}
}
+ // Since memory can be set with a system property too, use that
+ executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
if (environment != null) {
executorEnvs ++= environment
}
@@ -156,14 +157,12 @@ class SparkContext(
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
- // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+ // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
- val sparkMemEnv = System.getenv("SPARK_MEM")
- val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
- if (sparkMemEnvInt > memoryPerSlaveInt) {
+ if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
throw new SparkException(
- "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
- memoryPerSlaveInt, sparkMemEnvInt))
+ "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
+ memoryPerSlaveInt, SparkContext.executorMemoryRequested))
}
val scheduler = new ClusterScheduler(this)
@@ -881,6 +880,15 @@ object SparkContext {
/** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
+
+ /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
+ private[spark] val executorMemoryRequested = {
+ // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+ Option(System.getProperty("spark.executor.memory"))
+ .orElse(Option(System.getenv("SPARK_MEM")))
+ .map(Utils.memoryStringToMb)
+ .getOrElse(512)
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
index 9ac875de3a..8844057a5c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/SchedulerBackend.scala
@@ -1,6 +1,6 @@
package spark.scheduler.cluster
-import spark.Utils
+import spark.{SparkContext, Utils}
/**
* A backend interface for cluster scheduling systems that allows plugging in different ones under
@@ -14,14 +14,7 @@ private[spark] trait SchedulerBackend {
def defaultParallelism(): Int
// Memory used by each executor (in megabytes)
- protected val executorMemory = {
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- Option(System.getProperty("spark.executor.memory"))
- .orElse(Option(System.getenv("SPARK_MEM")))
- .map(Utils.memoryStringToMb)
- .getOrElse(512)
- }
-
+ protected val executorMemory: Int = SparkContext.executorMemoryRequested
// TODO: Probably want to add a killTask too
}