aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala128
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala14
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala9
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala3
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
29 files changed, 331 insertions, 85 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 979d178c35..97109b9f41 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
+ private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
+ private[spark] val eventLogDir: Option[String] = {
+ if (isEventLogEnabled) {
+ Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
+ } else {
+ None
+ }
+ }
+
// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val listenerBus = new LiveListenerBus
// Create the Spark execution environment (cache, map output tracker, etc)
+ conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
- // Optionally log Spark events
- private[spark] val eventLogger: Option[EventLoggingListener] = {
- if (conf.getBoolean("spark.eventLog.enabled", false)) {
- val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
- logger.start()
- listenerBus.addListener(logger)
- Some(logger)
- } else None
- }
-
- // At this point, all relevant SparkListeners have been registered, so begin releasing events
- listenerBus.start()
-
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
// constructor
taskScheduler.start()
+ val applicationId: String = taskScheduler.applicationId()
+ conf.set("spark.app.id", applicationId)
+
+ val metricsSystem = env.metricsSystem
+
+ // The metrics system for Driver need to be set spark.app.id to app ID.
+ // So it should start after we get app ID from the task scheduler and set spark.app.id.
+ metricsSystem.start()
+
+ // Optionally log Spark events
+ private[spark] val eventLogger: Option[EventLoggingListener] = {
+ if (isEventLogEnabled) {
+ val logger =
+ new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
+ logger.start()
+ listenerBus.addListener(logger)
+ Some(logger)
+ } else None
+ }
+
+ // At this point, all relevant SparkListeners have been registered, so begin releasing events
+ listenerBus.start()
+
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
taskScheduler.postStartHook()
- private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
- private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
+ private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+ private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
- listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
+ listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 009ed64775..72cac42cd2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -259,11 +259,15 @@ object SparkEnv extends Logging {
}
val metricsSystem = if (isDriver) {
+ // Don't start metrics system right now for Driver.
+ // We need to wait for the task scheduler to give us an app ID.
+ // Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
- MetricsSystem.createMetricsSystem("executor", conf, securityManager)
+ val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
+ ms.start()
+ ms
}
- metricsSystem.start()
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 432b552c58..f98b531316 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
- SparkHadoopUtil}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
+ ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -693,16 +693,18 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
- val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
+
+ val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
+ val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
- val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
+ val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
- var msg = s"No event logs found for application $appName in $eventLogDir."
+ var msg = s"No event logs found for application $appName in $appEventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 13af5b6f58..06061edfc0 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
executorId: String,
hostname: String,
cores: Int,
+ appId: String,
workerUrl: Option[String]) {
SignalLogger.register(log)
@@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
- val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
+ val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
+ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()
// Create a new ActorSystem using driver's Spark properties to run the backend.
@@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]) {
args.length match {
- case x if x < 4 =>
+ case x if x < 5 =>
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
- "<cores> [<workerUrl>]")
+ "<cores> <appid> [<workerUrl>] ")
System.exit(1)
- case 4 =>
- run(args(0), args(1), args(2), args(3).toInt, None)
- case x if x > 4 =>
- run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
+ case 5 =>
+ run(args(0), args(1), args(2), args(3).toInt, args(4), None)
+ case x if x > 5 =>
+ run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d7211ae465..9bbfcdc4a0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,6 +74,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
+ conf.set("spark.executor.id", "executor." + executorId)
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index d672158656..c4d73622c4 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
override val metricRegistry = new MetricRegistry()
- // TODO: It would be nice to pass the application name here
- override val sourceName = "executor.%s".format(executorId)
+ override val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index a42c8b43bb..bca0b15226 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
+ val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
+ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index fd316a89a1..5dd67b0cbf 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -83,10 +83,10 @@ private[spark] class MetricsSystem private (
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
metricsConfig.initialize()
- registerSources()
- registerSinks()
def start() {
+ registerSources()
+ registerSinks()
sinks.foreach(_.start)
}
@@ -98,10 +98,39 @@ private[spark] class MetricsSystem private (
sinks.foreach(_.report())
}
+ /**
+ * Build a name that uniquely identifies each metric source.
+ * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
+ * If either ID is not available, this defaults to just using <source name>.
+ *
+ * @param source Metric source to be named by this method.
+ * @return An unique metric name for each combination of
+ * application, executor/driver and metric source.
+ */
+ def buildRegistryName(source: Source): String = {
+ val appId = conf.getOption("spark.app.id")
+ val executorId = conf.getOption("spark.executor.id")
+ val defaultName = MetricRegistry.name(source.sourceName)
+
+ if (instance == "driver" || instance == "executor") {
+ if (appId.isDefined && executorId.isDefined) {
+ MetricRegistry.name(appId.get, executorId.get, source.sourceName)
+ } else {
+ // Only Driver and Executor are set spark.app.id and spark.executor.id.
+ // For instance, Master and Worker are not related to a specific application.
+ val warningMsg = s"Using default name $defaultName for source because %s is not set."
+ if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
+ if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
+ defaultName
+ }
+ } else { defaultName }
+ }
+
def registerSource(source: Source) {
sources += source
try {
- registry.register(source.sourceName, source.metricRegistry)
+ val regName = buildRegistryName(source)
+ registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
@@ -109,8 +138,9 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
+ val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
}
@@ -125,7 +155,7 @@ private[spark] class MetricsSystem private (
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
- case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
+ case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 94944399b1..12668b6c09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
- override val sourceName = "%s.DAGScheduler".format(sc.appName)
+ override val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 64b32ae0ed..100c9ba9b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -43,25 +43,23 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
- appName: String,
+ appId: String,
+ logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
- def this(appName: String, sparkConf: SparkConf) =
- this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
+ this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
- private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
- private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
- .toLowerCase + "-" + System.currentTimeMillis
- val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
-
+ val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
+ val logDirName: String = logDir.split("/").last
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
@@ -69,13 +67,6 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
/**
- * Return only the unique application directory without the base directory.
- */
- def getApplicationLogDir(): String = {
- name
- }
-
- /**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
*/
@@ -185,6 +176,18 @@ private[spark] object EventLoggingListener extends Logging {
}
/**
+ * Return a file-system-safe path to the log directory for the given application.
+ *
+ * @param logBaseDir A base directory for the path to the log directory for given application.
+ * @param appId A unique app ID.
+ * @return A path which consists of file-system-safe characters.
+ */
+ def getLogDirPath(logBaseDir: String, appId: String): String = {
+ val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
+ Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+ }
+
+ /**
* Parse the event logging information associated with the logs in the given directory.
*
* Specifically, this looks for event log files, the Spark version file, the compression
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index a0be8307ef..992c477493 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -23,6 +23,8 @@ package org.apache.spark.scheduler
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
+ private val appId = "spark-application-" + System.currentTimeMillis
+
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
@@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend {
def isReady(): Boolean = true
/**
- * The application ID associated with the job, if any.
+ * Get an application ID associated with the job.
*
- * @return The application ID, or None if the backend does not provide an ID.
+ * @return An application ID
*/
- def applicationId(): Option[String] = None
+ def applicationId(): String = appId
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1c1ce666ea..a129a434c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -31,6 +31,8 @@ import org.apache.spark.storage.BlockManagerId
*/
private[spark] trait TaskScheduler {
+ private val appId = "spark-application-" + System.currentTimeMillis
+
def rootPool: Pool
def schedulingMode: SchedulingMode
@@ -66,10 +68,10 @@ private[spark] trait TaskScheduler {
blockManagerId: BlockManagerId): Boolean
/**
- * The application ID associated with the job, if any.
+ * Get an application ID associated with the job.
*
- * @return The application ID, or None if the backend does not provide an ID.
+ * @return An application ID
*/
- def applicationId(): Option[String] = None
+ def applicationId(): String = appId
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 633e892554..4dc550413c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -492,7 +492,7 @@ private[spark] class TaskSchedulerImpl(
}
}
- override def applicationId(): Option[String] = backend.applicationId()
+ override def applicationId(): String = backend.applicationId()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 5c5ecc8434..ed209d195e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -68,9 +68,8 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
- val eventLogDir = sc.eventLogger.map(_.logDir)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- appUIAddress, eventLogDir)
+ appUIAddress, sc.eventLogDir)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
@@ -129,7 +128,11 @@ private[spark] class SparkDeploySchedulerBackend(
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
- override def applicationId(): Option[String] = Option(appId)
+ override def applicationId(): String =
+ Option(appId).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
private def waitForRegistration() = {
registrationLock.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3161f1ee9f..90828578cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -76,6 +76,8 @@ private[spark] class CoarseMesosSchedulerBackend(
var nextMesosTaskId = 0
+ @volatile var appId: String = _
+
def newMesosTaskId(): Int = {
val id = nextMesosTaskId
nextMesosTaskId += 1
@@ -167,7 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
+ appId = frameworkId.getValue
+ logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
@@ -313,4 +316,10 @@ private[spark] class CoarseMesosSchedulerBackend(
slaveLost(d, s)
}
+ override def applicationId(): String =
+ Option(appId).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 4c49aa074e..b11786368e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -30,7 +30,7 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
/**
@@ -62,6 +62,8 @@ private[spark] class MesosSchedulerBackend(
var classLoader: ClassLoader = null
+ @volatile var appId: String = _
+
override def start() {
synchronized {
classLoader = Thread.currentThread.getContextClassLoader
@@ -177,7 +179,8 @@ private[spark] class MesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
val oldClassLoader = setClassLoader()
try {
- logInfo("Registered as framework ID " + frameworkId.getValue)
+ appId = frameworkId.getValue
+ logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
@@ -372,4 +375,10 @@ private[spark] class MesosSchedulerBackend(
// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+ override def applicationId(): String =
+ Option(appId).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 9ea25c2bc7..58b78f041c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -88,6 +88,7 @@ private[spark] class LocalActor(
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
extends SchedulerBackend with ExecutorBackend {
+ private val appId = "local-" + System.currentTimeMillis
var localActor: ActorRef = null
override def start() {
@@ -115,4 +116,6 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
localActor ! StatusUpdate(taskId, state, serializedData)
}
+ override def applicationId(): String = appId
+
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 49fea6d9e2..8569c6f3cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source
-private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+private[spark] class BlockManagerSource(val blockManager: BlockManager)
extends Source {
override val metricRegistry = new MetricRegistry()
- override val sourceName = "%s.BlockManager".format(sc.appName)
+ override val sourceName = "BlockManager"
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index e42b181194..3925f0ccbd 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -17,14 +17,15 @@
package org.apache.spark.metrics
-import org.apache.spark.metrics.source.Source
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.metrics.source.Source
-import scala.collection.mutable.ArrayBuffer
+import com.codahale.metrics.MetricRegistry
+import scala.collection.mutable.ArrayBuffer
class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{
var filePath: String = _
@@ -39,6 +40,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
+ metricsSystem.start()
val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
@@ -49,6 +51,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
+ metricsSystem.start()
val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
@@ -60,4 +63,125 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
metricsSystem.registerSource(source)
assert(metricsSystem.invokePrivate(sources()).length === 1)
}
+
+ test("MetricsSystem with Driver instance") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val executorId = "driver"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "driver"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === s"$appId.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Driver instance and spark.app.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val executorId = "driver"
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "driver"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with Driver instance and spark.executor.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ conf.set("spark.app.id", appId)
+
+ val instanceName = "driver"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with Executor instance") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val executorId = "executor.1"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === s"$appId.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Executor instance and spark.app.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val executorId = "executor.1"
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with Executor instance and spark.executor.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ conf.set("spark.app.id", appId)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with instance which is neither Driver nor Executor") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val executorId = "dummyExecutorId"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "testInstance"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+
+ // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name.
+ assert(metricName != s"$appId.$executorId.${source.sourceName}")
+ assert(metricName === source.sourceName)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index e5315bc93e..3efa854318 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -169,7 +169,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify logging directory exists
val conf = getLoggingConf(logDirPath, compressionCodec)
- val eventLogger = new EventLoggingListener("test", conf)
+ val logBaseDir = conf.get("spark.eventLog.dir")
+ val appId = EventLoggingListenerSuite.getUniqueApplicationId
+ val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
eventLogger.start()
val logPath = new Path(eventLogger.logDir)
assert(fileSystem.exists(logPath))
@@ -209,7 +211,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify that all information is correctly parsed before stop()
val conf = getLoggingConf(logDirPath, compressionCodec)
- val eventLogger = new EventLoggingListener("test", conf)
+ val logBaseDir = conf.get("spark.eventLog.dir")
+ val appId = EventLoggingListenerSuite.getUniqueApplicationId
+ val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
eventLogger.start()
var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
@@ -228,7 +232,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
*/
private def testEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(logDirPath, compressionCodec)
- val eventLogger = new EventLoggingListener("test", conf)
+ val logBaseDir = conf.get("spark.eventLog.dir")
+ val appId = EventLoggingListenerSuite.getUniqueApplicationId
+ val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
@@ -408,4 +414,6 @@ object EventLoggingListenerSuite {
}
conf
}
+
+ def getUniqueApplicationId = "test-" + System.currentTimeMillis
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 7ab351d1b4..48114feee6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -155,7 +155,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* This child listener inherits only the event buffering functionality, but does not actually
* log the events.
*/
- private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
+ private class EventMonster(conf: SparkConf)
+ extends EventLoggingListener("test", "testdir", conf) {
logger.close()
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index 99c8d13231..eb6e88cf55 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.nio.ByteBuffer
+import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
@@ -36,6 +37,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
val receiver = new FakeReceiver
val executor = new FakeReceiverSupervisor(receiver)
+ val executorStarted = new Semaphore(0)
assert(executor.isAllEmpty)
@@ -43,6 +45,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
val executingThread = new Thread() {
override def run() {
executor.start()
+ executorStarted.release(1)
executor.awaitTermination()
}
}
@@ -57,6 +60,9 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
}
}
+ // Ensure executor is started
+ executorStarted.acquire()
+
// Verify that receiver was started
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
@@ -186,10 +192,10 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
- var otherThread: Thread = null
- var receiving = false
- var onStartCalled = false
- var onStopCalled = false
+ @volatile var otherThread: Thread = null
+ @volatile var receiving = false
+ @volatile var onStartCalled = false
+ @volatile var onStopCalled = false
def onStart() {
otherThread = new Thread() {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 10cbeb8b94..229b7a09f4 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -47,6 +47,7 @@ class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
+ appAttemptId: String,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {
@@ -83,7 +84,7 @@ class ExecutorRunnable(
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- localResources)
+ appAttemptId, localResources)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index d7a7175d5e..5cb4753de2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -43,6 +43,7 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
+ appId: String,
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@@ -114,6 +115,7 @@ trait ExecutorRunnableUtil extends Logging {
slaveId.toString,
hostname.toString,
executorCores.toString,
+ appId,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4f4f1d2aaa..e1af8d5a74 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -57,6 +57,7 @@ object AllocationType extends Enumeration {
private[yarn] abstract class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
+ appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
@@ -295,6 +296,7 @@ private[yarn] abstract class YarnAllocator(
executorHostname,
executorMemory,
executorCores,
+ appAttemptId.getApplicationId.toString,
securityMgr)
launcherPool.execute(executorRunnable)
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 200a308992..6bb4b82316 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -155,6 +155,10 @@ private[spark] class YarnClientSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
- override def applicationId(): Option[String] = Option(appId).map(_.toString())
+ override def applicationId(): String =
+ Option(appId).map(_.toString).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 39436d0999..3a186cfeb4 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -48,6 +48,13 @@ private[spark] class YarnClusterSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
- override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
+ override def applicationId(): String =
+ // In YARN Cluster mode, spark.yarn.app.id is expect to be set
+ // before user application is launched.
+ // So, if spark.yarn.app.id is not set, it is something wrong.
+ sc.getConf.getOption("spark.yarn.app.id").getOrElse {
+ logError("Application ID is not set.")
+ super.applicationId
+ }
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 833be12982..0b5a92d87d 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -47,6 +47,7 @@ class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
+ appId: String,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {
@@ -80,7 +81,7 @@ class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- localResources)
+ appId, localResources)
logInfo(s"Setting up executor with environment: $env")
logInfo("Setting up executor with commands: " + commands)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e44a8db41b..2bbf5d7db8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -41,7 +41,7 @@ private[yarn] class YarnAllocationHandler(
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
- extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
+ extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
override protected def releaseContainer(container: Container) = {
amClient.releaseAssignedContainer(container.getId())