aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala505
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala19
-rw-r--r--core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala6
6 files changed, 329 insertions, 244 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3f1a7dd99d..e106c5c4be 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,6 +31,7 @@ import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
+import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -50,9 +51,10 @@ import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
FixedLengthBinaryInputFormat}
import org.apache.spark.io.CompressionCodec
+import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
SparkDeploySchedulerBackend, SimrSchedulerBackend}
@@ -192,8 +194,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// log out Spark Version in Spark driver log
logInfo(s"Running Spark version $SPARK_VERSION")
- private[spark] val conf = config.clone()
- conf.validateSettings()
+ /* ------------------------------------------------------------------------------------- *
+ | Private variables. These variables keep the internal state of the context, and are |
+ | not accessible by the outside world. They're mutable since we want to initialize all |
+ | of them to some neutral value ahead of time, so that calling "stop()" while the |
+ | constructor is still running is safe. |
+ * ------------------------------------------------------------------------------------- */
+
+ private var _conf: SparkConf = _
+ private var _eventLogDir: Option[URI] = None
+ private var _eventLogCodec: Option[String] = None
+ private var _env: SparkEnv = _
+ private var _metadataCleaner: MetadataCleaner = _
+ private var _jobProgressListener: JobProgressListener = _
+ private var _statusTracker: SparkStatusTracker = _
+ private var _progressBar: Option[ConsoleProgressBar] = None
+ private var _ui: Option[SparkUI] = None
+ private var _hadoopConfiguration: Configuration = _
+ private var _executorMemory: Int = _
+ private var _schedulerBackend: SchedulerBackend = _
+ private var _taskScheduler: TaskScheduler = _
+ private var _heartbeatReceiver: RpcEndpointRef = _
+ @volatile private var _dagScheduler: DAGScheduler = _
+ private var _applicationId: String = _
+ private var _eventLogger: Option[EventLoggingListener] = None
+ private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
+ private var _cleaner: Option[ContextCleaner] = None
+ private var _listenerBusStarted: Boolean = false
+ private var _jars: Seq[String] = _
+ private var _files: Seq[String] = _
+
+ /* ------------------------------------------------------------------------------------- *
+ | Accessors and public fields. These provide access to the internal state of the |
+ | context. |
+ * ------------------------------------------------------------------------------------- */
+
+ private[spark] def conf: SparkConf = _conf
/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
@@ -201,65 +237,24 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
def getConf: SparkConf = conf.clone()
- if (!conf.contains("spark.master")) {
- throw new SparkException("A master URL must be set in your configuration")
- }
- if (!conf.contains("spark.app.name")) {
- throw new SparkException("An application name must be set in your configuration")
- }
-
- if (conf.getBoolean("spark.logConf", false)) {
- logInfo("Spark configuration:\n" + conf.toDebugString)
- }
-
- // Set Spark driver host and port system properties
- conf.setIfMissing("spark.driver.host", Utils.localHostName())
- conf.setIfMissing("spark.driver.port", "0")
-
- val jars: Seq[String] =
- conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
-
- val files: Seq[String] =
- conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
-
- val master = conf.get("spark.master")
- val appName = conf.get("spark.app.name")
+ def jars: Seq[String] = _jars
+ def files: Seq[String] = _files
+ def master: String = _conf.get("spark.master")
+ def appName: String = _conf.get("spark.app.name")
- private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
- private[spark] val eventLogDir: Option[URI] = {
- if (isEventLogEnabled) {
- val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
- .stripSuffix("/")
- Some(Utils.resolveURI(unresolvedDir))
- } else {
- None
- }
- }
- private[spark] val eventLogCodec: Option[String] = {
- val compress = conf.getBoolean("spark.eventLog.compress", false)
- if (compress && isEventLogEnabled) {
- Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
- } else {
- None
- }
- }
+ private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
+ private[spark] def eventLogDir: Option[URI] = _eventLogDir
+ private[spark] def eventLogCodec: Option[String] = _eventLogCodec
// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
- conf.set("spark.tachyonStore.folderName", tachyonFolderName)
- val isLocal = (master == "local" || master.startsWith("local["))
-
- if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+ def isLocal: Boolean = (master == "local" || master.startsWith("local["))
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
- conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
-
- // Create the Spark execution environment (cache, map output tracker, etc)
-
// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
@@ -268,8 +263,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
}
- private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
- SparkEnv.set(env)
+ private[spark] def env: SparkEnv = _env
// Used to store a URL for each static file/jar together with the file's local timestamp
private[spark] val addedFiles = HashMap[String, Long]()
@@ -277,35 +271,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
- private[spark] val metadataCleaner =
- new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
-
+ private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
+ private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
- private[spark] val jobProgressListener = new JobProgressListener(conf)
- listenerBus.addListener(jobProgressListener)
+ def statusTracker: SparkStatusTracker = _statusTracker
- val statusTracker = new SparkStatusTracker(this)
+ private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
- private[spark] val progressBar: Option[ConsoleProgressBar] =
- if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
- Some(new ConsoleProgressBar(this))
- } else {
- None
- }
-
- // Initialize the Spark UI
- private[spark] val ui: Option[SparkUI] =
- if (conf.getBoolean("spark.ui.enabled", true)) {
- Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
- env.securityManager,appName))
- } else {
- // For tests, do not enable the UI
- None
- }
-
- // Bind the UI before starting the task scheduler to communicate
- // the bound port to the cluster manager properly
- ui.foreach(_.bind())
+ private[spark] def ui: Option[SparkUI] = _ui
/**
* A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
@@ -313,134 +286,248 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
* plan to set some global configurations for all Hadoop RDDs.
*/
- val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
+ def hadoopConfiguration: Configuration = _hadoopConfiguration
+
+ private[spark] def executorMemory: Int = _executorMemory
+
+ // Environment variables to pass to our executors.
+ private[spark] val executorEnvs = HashMap[String, String]()
+
+ // Set SPARK_USER for user who is running SparkContext.
+ val sparkUser = Utils.getCurrentUserName()
- // Add each JAR given through the constructor
- if (jars != null) {
- jars.foreach(addJar)
+ private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
+ private[spark] def schedulerBackend_=(sb: SchedulerBackend): Unit = {
+ _schedulerBackend = sb
}
- if (files != null) {
- files.foreach(addFile)
+ private[spark] def taskScheduler: TaskScheduler = _taskScheduler
+ private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
+ _taskScheduler = ts
}
+ private[spark] def dagScheduler: DAGScheduler = _dagScheduler
+ private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
+ _dagScheduler = ds
+ }
+
+ def applicationId: String = _applicationId
+
+ def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
+
+ private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger
+
+ private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
+ _executorAllocationManager
+
+ private[spark] def cleaner: Option[ContextCleaner] = _cleaner
+
+ private[spark] var checkpointDir: Option[String] = None
+
+ // Thread Local variable that can be used by users to pass information down the stack
+ private val localProperties = new InheritableThreadLocal[Properties] {
+ override protected def childValue(parent: Properties): Properties = new Properties(parent)
+ override protected def initialValue(): Properties = new Properties()
+ }
+
+ /* ------------------------------------------------------------------------------------- *
+ | Initialization. This code initializes the context in a manner that is exception-safe. |
+ | All internal fields holding state are initialized here, and any error prompts the |
+ | stop() method to be called. |
+ * ------------------------------------------------------------------------------------- */
+
private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
}
- private[spark] val executorMemory = conf.getOption("spark.executor.memory")
- .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
- .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
- .map(Utils.memoryStringToMb)
- .getOrElse(512)
+ try {
+ _conf = config.clone()
+ _conf.validateSettings()
- // Environment variables to pass to our executors.
- private[spark] val executorEnvs = HashMap[String, String]()
+ if (!_conf.contains("spark.master")) {
+ throw new SparkException("A master URL must be set in your configuration")
+ }
+ if (!_conf.contains("spark.app.name")) {
+ throw new SparkException("An application name must be set in your configuration")
+ }
- // Convert java options to env vars as a work around
- // since we can't set env vars directly in sbt.
- for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
- value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
- executorEnvs(envKey) = value
- }
- Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
- executorEnvs("SPARK_PREPEND_CLASSES") = v
- }
- // The Mesos scheduler backend relies on this environment variable to set executor memory.
- // TODO: Set this only in the Mesos scheduler.
- executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
- executorEnvs ++= conf.getExecutorEnv
+ if (_conf.getBoolean("spark.logConf", false)) {
+ logInfo("Spark configuration:\n" + _conf.toDebugString)
+ }
- // Set SPARK_USER for user who is running SparkContext.
- val sparkUser = Utils.getCurrentUserName()
- executorEnvs("SPARK_USER") = sparkUser
+ // Set Spark driver host and port system properties
+ _conf.setIfMissing("spark.driver.host", Utils.localHostName())
+ _conf.setIfMissing("spark.driver.port", "0")
- // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
- // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
- private val heartbeatReceiver = env.rpcEnv.setupEndpoint(
- HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+ _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
- // Create and start the scheduler
- private[spark] var (schedulerBackend, taskScheduler) =
- SparkContext.createTaskScheduler(this, master)
+ _jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+ _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
+ .toSeq.flatten
- heartbeatReceiver.send(TaskSchedulerIsSet)
+ _eventLogDir =
+ if (isEventLogEnabled) {
+ val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
+ .stripSuffix("/")
+ Some(Utils.resolveURI(unresolvedDir))
+ } else {
+ None
+ }
- @volatile private[spark] var dagScheduler: DAGScheduler = _
- try {
- dagScheduler = new DAGScheduler(this)
- } catch {
- case e: Exception => {
- try {
- stop()
- } finally {
- throw new SparkException("Error while constructing DAGScheduler", e)
+ _eventLogCodec = {
+ val compress = _conf.getBoolean("spark.eventLog.compress", false)
+ if (compress && isEventLogEnabled) {
+ Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
+ } else {
+ None
}
}
- }
- // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
- // constructor
- taskScheduler.start()
+ _conf.set("spark.tachyonStore.folderName", tachyonFolderName)
- val applicationId: String = taskScheduler.applicationId()
- conf.set("spark.app.id", applicationId)
+ if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
- env.blockManager.initialize(applicationId)
+ // Create the Spark execution environment (cache, map output tracker, etc)
+ _env = createSparkEnv(_conf, isLocal, listenerBus)
+ SparkEnv.set(_env)
- val metricsSystem = env.metricsSystem
+ _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
- // The metrics system for Driver need to be set spark.app.id to app ID.
- // So it should start after we get app ID from the task scheduler and set spark.app.id.
- metricsSystem.start()
- // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
- metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
+ _jobProgressListener = new JobProgressListener(_conf)
+ listenerBus.addListener(jobProgressListener)
- // Optionally log Spark events
- private[spark] val eventLogger: Option[EventLoggingListener] = {
- if (isEventLogEnabled) {
- val logger =
- new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
- logger.start()
- listenerBus.addListener(logger)
- Some(logger)
- } else None
- }
+ _statusTracker = new SparkStatusTracker(this)
- // Optionally scale number of executors dynamically based on workload. Exposed for testing.
- private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
- private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
- private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
- if (dynamicAllocationEnabled) {
- assert(supportDynamicAllocation,
- "Dynamic allocation of executors is currently only supported in YARN mode")
- Some(new ExecutorAllocationManager(this, listenerBus, conf))
- } else {
- None
+ _progressBar =
+ if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
+ Some(new ConsoleProgressBar(this))
+ } else {
+ None
+ }
+
+ _ui =
+ if (conf.getBoolean("spark.ui.enabled", true)) {
+ Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
+ _env.securityManager,appName))
+ } else {
+ // For tests, do not enable the UI
+ None
+ }
+ // Bind the UI before starting the task scheduler to communicate
+ // the bound port to the cluster manager properly
+ _ui.foreach(_.bind())
+
+ _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
+
+ // Add each JAR given through the constructor
+ if (jars != null) {
+ jars.foreach(addJar)
}
- executorAllocationManager.foreach(_.start())
- private[spark] val cleaner: Option[ContextCleaner] = {
- if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
- Some(new ContextCleaner(this))
- } else {
- None
+ if (files != null) {
+ files.foreach(addFile)
}
- }
- cleaner.foreach(_.start())
- setupAndStartListenerBus()
- postEnvironmentUpdate()
- postApplicationStart()
+ _executorMemory = _conf.getOption("spark.executor.memory")
+ .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
+ .orElse(Option(System.getenv("SPARK_MEM"))
+ .map(warnSparkMem))
+ .map(Utils.memoryStringToMb)
+ .getOrElse(512)
+
+ // Convert java options to env vars as a work around
+ // since we can't set env vars directly in sbt.
+ for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
+ value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
+ executorEnvs(envKey) = value
+ }
+ Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
+ executorEnvs("SPARK_PREPEND_CLASSES") = v
+ }
+ // The Mesos scheduler backend relies on this environment variable to set executor memory.
+ // TODO: Set this only in the Mesos scheduler.
+ executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
+ executorEnvs ++= _conf.getExecutorEnv
+ executorEnvs("SPARK_USER") = sparkUser
+
+ // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
+ // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
+ _heartbeatReceiver = env.rpcEnv.setupEndpoint(
+ HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+
+ // Create and start the scheduler
+ val (sched, ts) = SparkContext.createTaskScheduler(this, master)
+ _schedulerBackend = sched
+ _taskScheduler = ts
+ _dagScheduler = new DAGScheduler(this)
+ _heartbeatReceiver.send(TaskSchedulerIsSet)
+
+ // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
+ // constructor
+ _taskScheduler.start()
+
+ _applicationId = _taskScheduler.applicationId()
+ _conf.set("spark.app.id", _applicationId)
+ _env.blockManager.initialize(_applicationId)
+
+ // The metrics system for Driver need to be set spark.app.id to app ID.
+ // So it should start after we get app ID from the task scheduler and set spark.app.id.
+ metricsSystem.start()
+ // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
+ metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
+
+ _eventLogger =
+ if (isEventLogEnabled) {
+ val logger =
+ new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+ logger.start()
+ listenerBus.addListener(logger)
+ Some(logger)
+ } else {
+ None
+ }
- private[spark] var checkpointDir: Option[String] = None
+ // Optionally scale number of executors dynamically based on workload. Exposed for testing.
+ val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
+ _executorAllocationManager =
+ if (dynamicAllocationEnabled) {
+ assert(supportDynamicAllocation,
+ "Dynamic allocation of executors is currently only supported in YARN mode")
+ Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+ } else {
+ None
+ }
+ _executorAllocationManager.foreach(_.start())
- // Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new InheritableThreadLocal[Properties] {
- override protected def childValue(parent: Properties): Properties = new Properties(parent)
- override protected def initialValue(): Properties = new Properties()
+ _cleaner =
+ if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
+ Some(new ContextCleaner(this))
+ } else {
+ None
+ }
+ _cleaner.foreach(_.start())
+
+ setupAndStartListenerBus()
+ postEnvironmentUpdate()
+ postApplicationStart()
+
+ // Post init
+ _taskScheduler.postStartHook()
+ _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
+ _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+ } catch {
+ case NonFatal(e) =>
+ logError("Error initializing SparkContext.", e)
+ try {
+ stop()
+ } catch {
+ case NonFatal(inner) =>
+ logError("Error stopping SparkContext after init error.", inner)
+ } finally {
+ throw e
+ }
}
/**
@@ -544,19 +631,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
- // Post init
- taskScheduler.postStartHook()
-
- private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
- private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-
- private def initDriverMetrics() {
- SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
- SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
- }
-
- initDriverMetrics()
-
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD.
@@ -1146,7 +1220,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* this application is supported. This is currently only available for YARN.
*/
private[spark] def supportDynamicAllocation =
- master.contains("yarn") || dynamicAllocationTesting
+ master.contains("yarn") || _conf.getBoolean("spark.dynamicAllocation.testing", false)
/**
* :: DeveloperApi ::
@@ -1163,7 +1237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* This is currently only supported in YARN mode. Return whether the request is received.
*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
- assert(master.contains("yarn") || dynamicAllocationTesting,
+ assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
@@ -1403,28 +1477,40 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
def stop() {
// Use the stopping variable to ensure no contention for the stop scenario.
// Still track the stopped variable for use elsewhere in the code.
-
if (!stopped.compareAndSet(false, true)) {
logInfo("SparkContext already stopped.")
return
}
-
+
postApplicationEnd()
- ui.foreach(_.stop())
- env.metricsSystem.report()
- metadataCleaner.cancel()
- cleaner.foreach(_.stop())
- executorAllocationManager.foreach(_.stop())
- dagScheduler.stop()
- dagScheduler = null
- listenerBus.stop()
- eventLogger.foreach(_.stop())
- env.rpcEnv.stop(heartbeatReceiver)
- progressBar.foreach(_.stop())
- taskScheduler = null
+ _ui.foreach(_.stop())
+ if (env != null) {
+ env.metricsSystem.report()
+ }
+ if (metadataCleaner != null) {
+ metadataCleaner.cancel()
+ }
+ _cleaner.foreach(_.stop())
+ _executorAllocationManager.foreach(_.stop())
+ if (_dagScheduler != null) {
+ _dagScheduler.stop()
+ _dagScheduler = null
+ }
+ if (_listenerBusStarted) {
+ listenerBus.stop()
+ _listenerBusStarted = false
+ }
+ _eventLogger.foreach(_.stop())
+ if (env != null && _heartbeatReceiver != null) {
+ env.rpcEnv.stop(_heartbeatReceiver)
+ }
+ _progressBar.foreach(_.stop())
+ _taskScheduler = null
// TODO: Cache.stop()?
- env.stop()
- SparkEnv.set(null)
+ if (_env != null) {
+ _env.stop()
+ SparkEnv.set(null)
+ }
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}
@@ -1749,6 +1835,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
listenerBus.start(this)
+ _listenerBusStarted = true
}
/** Post the application start event */
@@ -2152,7 +2239,7 @@ object SparkContext extends Logging {
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
- val backend = new LocalBackend(scheduler, 1)
+ val backend = new LocalBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
@@ -2164,7 +2251,7 @@ object SparkContext extends Logging {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
- val backend = new LocalBackend(scheduler, threadCount)
+ val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
@@ -2174,7 +2261,7 @@ object SparkContext extends Logging {
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
- val backend = new LocalBackend(scheduler, threadCount)
+ val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 516f619529..1b5fdeba28 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -21,7 +21,7 @@ import java.io.File
import java.lang.management.ManagementFactory
import java.net.URL
import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -60,8 +60,6 @@ private[spark] class Executor(
private val conf = env.conf
- @volatile private var isStopped = false
-
// No ip or host:port - just hostname
Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
// must not have port specified.
@@ -114,6 +112,10 @@ private[spark] class Executor(
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
+ // Executor for the heartbeat task.
+ private val heartbeater = Executors.newSingleThreadScheduledExecutor(
+ Utils.namedThreadFactory("driver-heartbeater"))
+
startDriverHeartbeater()
def launchTask(
@@ -138,7 +140,8 @@ private[spark] class Executor(
def stop(): Unit = {
env.metricsSystem.report()
env.rpcEnv.stop(executorEndpoint)
- isStopped = true
+ heartbeater.shutdown()
+ heartbeater.awaitTermination(10, TimeUnit.SECONDS)
threadPool.shutdown()
if (!isLocal) {
env.stop()
@@ -432,23 +435,17 @@ private[spark] class Executor(
}
/**
- * Starts a thread to report heartbeat and partial metrics for active tasks to driver.
- * This thread stops running when the executor is stopped.
+ * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
- val thread = new Thread() {
- override def run() {
- // Sleep a random interval so the heartbeats don't end up in sync
- Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
- while (!isStopped) {
- reportHeartBeat()
- Thread.sleep(intervalMs)
- }
- }
+
+ // Wait a random interval so the heartbeats don't end up in sync
+ val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
+
+ val heartbeatTask = new Runnable() {
+ override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
- thread.setDaemon(true)
- thread.setName("driver-heartbeater")
- thread.start()
+ heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
}
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 8e3c30fc3d..5a74c13b38 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -86,11 +86,11 @@ private[nio] class ConnectionManager(
conf.get("spark.network.timeout", "120s"))
// Get the thread counts from the Spark Configuration.
- //
+ //
// Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
// we only query for the minimum value because we are using LinkedBlockingDeque.
- //
- // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
+ //
+ // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
// an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
// parameter is necessary.
private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
@@ -989,6 +989,7 @@ private[nio] class ConnectionManager(
def stop() {
ackTimeoutMonitor.stop()
+ selector.wakeup()
selectorThread.interrupt()
selectorThread.join()
selector.close()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ecc8bf1899..13a52d836f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -142,11 +142,10 @@ private[spark] class TaskSchedulerImpl(
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
- import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
- }
+ }(sc.env.actorSystem.dispatcher)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 70a477a689..50ba0b9d5a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer
import java.util.concurrent.{Executors, TimeUnit}
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.util.Utils
private case class ReviveOffers()
@@ -71,11 +71,15 @@ private[spark] class LocalEndpoint(
case KillTask(taskId, interruptThread) =>
executor.killTask(taskId, interruptThread)
+ }
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case StopExecutor =>
executor.stop()
+ context.reply(true)
}
+
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
val tasks = scheduler.resourceOffers(offers).flatten
@@ -104,8 +108,11 @@ private[spark] class LocalEndpoint(
* master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
* on a single Executor (created by the LocalBackend) running locally.
*/
-private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
- extends SchedulerBackend with ExecutorBackend {
+private[spark] class LocalBackend(
+ conf: SparkConf,
+ scheduler: TaskSchedulerImpl,
+ val totalCores: Int)
+ extends SchedulerBackend with ExecutorBackend with Logging {
private val appId = "local-" + System.currentTimeMillis
var localEndpoint: RpcEndpointRef = null
@@ -116,7 +123,7 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
}
override def stop() {
- localEndpoint.send(StopExecutor)
+ localEndpoint.sendWithReply(StopExecutor)
}
override def reviveOffers() {
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 6b3049b28c..22acc270b9 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -56,19 +56,13 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
// Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { contexts += new SparkContext(conf1) }
- SparkEnv.get.stop()
- SparkContext.clearActiveContext()
// Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { contexts += new SparkContext(conf2) }
- SparkEnv.get.stop()
- SparkContext.clearActiveContext()
// Both min and max, but min > max
intercept[SparkException] { createSparkContext(2, 1) }
- SparkEnv.get.stop()
- SparkContext.clearActiveContext()
// Both min and max, and min == max
val sc1 = createSparkContext(1, 1)