aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/SparkContext.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/spark/SparkContext.scala')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala61
1 files changed, 44 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bc05d08fd6..46b9935cb7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package spark
import java.io._
@@ -36,6 +53,7 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
@@ -46,9 +64,9 @@ import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener,
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
+import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-
+import ui.{SparkUI}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -94,11 +112,6 @@ class SparkContext(
isLocal)
SparkEnv.set(env)
- // Start the BlockManager UI
- private[spark] val ui = new BlockManagerUI(
- env.actorSystem, env.blockManager.master.driverActor, this)
- ui.start()
-
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
@@ -107,6 +120,9 @@ class SparkContext(
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
+ // Initalize the Spark UI
+ private[spark] val ui = new SparkUI(this)
+ ui.bind()
// Add each JAR given through the constructor
if (jars != null) {
@@ -116,13 +132,14 @@ class SparkContext(
// Environment variables to pass to our executors
private[spark] val executorEnvs = HashMap[String, String]()
// Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
- for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
- "SPARK_TESTING")) {
+ for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
val value = System.getenv(key)
if (value != null) {
executorEnvs(key) = value
}
}
+ // Since memory can be set with a system property too, use that
+ executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
if (environment != null) {
executorEnvs ++= environment
}
@@ -157,14 +174,12 @@ class SparkContext(
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
- // Check to make sure SPARK_MEM <= memoryPerSlave. Otherwise Spark will just hang.
+ // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
- val sparkMemEnv = System.getenv("SPARK_MEM")
- val sparkMemEnvInt = if (sparkMemEnv != null) Utils.memoryStringToMb(sparkMemEnv) else 512
- if (sparkMemEnvInt > memoryPerSlaveInt) {
+ if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
throw new SparkException(
- "Slave memory (%d MB) cannot be smaller than SPARK_MEM (%d MB)".format(
- memoryPerSlaveInt, sparkMemEnvInt))
+ "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
+ memoryPerSlaveInt, SparkContext.executorMemoryRequested))
}
val scheduler = new ClusterScheduler(this)
@@ -216,6 +231,8 @@ class SparkContext(
@volatile private var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
+ ui.start()
+
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val conf = SparkHadoopUtil.newConfiguration()
@@ -578,6 +595,7 @@ class SparkContext(
/** Shut down the SparkContext. */
def stop() {
+ ui.stop()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
@@ -630,7 +648,7 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
- val callSite = Utils.getSparkCallSite
+ val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
@@ -713,7 +731,7 @@ class SparkContext(
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long): PartialResult[R] = {
- val callSite = Utils.getSparkCallSite
+ val callSite = Utils.formatSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
@@ -882,6 +900,15 @@ object SparkContext {
/** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
+
+ /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
+ private[spark] val executorMemoryRequested = {
+ // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+ Option(System.getProperty("spark.executor.memory"))
+ .orElse(Option(System.getenv("SPARK_MEM")))
+ .map(Utils.memoryStringToMb)
+ .getOrElse(512)
+ }
}