aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2014-10-03 13:48:56 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 13:48:56 -0700
commit79e45c9323455a51f25ed9acd0edd8682b4bbb88 (patch)
tree1eaecef693e1d196cde2836b8d59e5bb67c6da2f
parent1eb8389cb4ad40a405149b16e2719e12367d667a (diff)
downloadspark-79e45c9323455a51f25ed9acd0edd8682b4bbb88.tar.gz
spark-79e45c9323455a51f25ed9acd0edd8682b4bbb88.tar.bz2
spark-79e45c9323455a51f25ed9acd0edd8682b4bbb88.zip
[SPARK-3377] [SPARK-3610] Metrics can be accidentally aggregated / History server log name should not be based on user input
This PR is another solution for #2250 I'm using codahale base MetricsSystem of Spark with JMX or Graphite, and I saw following 2 problems. (1) When applications which have same spark.app.name run on cluster at the same time, some metrics names are mixed. For instance, if 2+ application is running on the cluster at the same time, each application emits the same named metric like "SparkPi.DAGScheduler.stage.failedStages" and Graphite cannot distinguish the metrics is for which application. (2) When 2+ executors run on the same machine, JVM metrics of each executors are mixed. For instance, 2+ executors running on the same node can emit the same named metric "jvm.memory" and Graphite cannot distinguish the metrics is from which application. And there is an similar issue. The directory for event logs is named using application name. Application name is defined by user and the name can includes illegal character for path names. Further more, the directory name consists of application name and System.currentTimeMillis even though each application has unique Application ID so if we run jobs which have same name, it's difficult to identify which directory is for which application. Closes #2250 Closes #1067 Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #2432 from sarutak/metrics-structure-improvement2 and squashes the following commits: 3288b2b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 39169e4 [Kousuke Saruta] Fixed style 6570494 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 817e4f0 [Kousuke Saruta] Simplified MetricsSystem#buildRegistryName 67fa5eb [Kousuke Saruta] Unified MetricsSystem#registerSources and registerSinks in start 10be654 [Kousuke Saruta] Fixed style. 990c078 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 f0c7fba [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 59cc2cd [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite f9b6fb3 [Kousuke Saruta] Modified style. 2cf8a0f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 389090d [Kousuke Saruta] Replaced taskScheduler.applicationId() with getApplicationId in SparkContext#postApplicationStart ff45c89 [Kousuke Saruta] Added some test cases to MetricsSystemSuite 69c46a6 [Kousuke Saruta] Added warning logging logic to MetricsSystem#buildRegistryName 5cca0d2 [Kousuke Saruta] Added Javadoc comment to SparkContext#getApplicationId 16a9f01 [Kousuke Saruta] Added data types to be returned to some methods 6434b06 [Kousuke Saruta] Reverted changes related to ApplicationId 0413b90 [Kousuke Saruta] Deleted ApplicationId.java and ApplicationIdSuite.java a42300c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 0fc1b09 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 42bea55 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 248935d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 f6af132 [Kousuke Saruta] Modified SchedulerBackend and TaskScheduler to return System.currentTimeMillis as an unique Application Id 1b8b53e [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 97cb85c [Kousuke Saruta] Modified confliction of MimExcludes 2cdd009 [Kousuke Saruta] Modified defailt implementation of applicationId 9aadb0b [Kousuke Saruta] Modified NetworkReceiverSuite to ensure "executor.start()" is finished in test "network receiver life cycle" 3011efc [Kousuke Saruta] Added ApplicationIdSuite.scala d009c55 [Kousuke Saruta] Modified ApplicationId#equals to compare appIds dfc83fd [Kousuke Saruta] Modified ApplicationId to implement Serializable 9ff4851 [Kousuke Saruta] Modified MimaExcludes.scala to ignore createTaskScheduler method in SparkContext 4567ffc [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 6a91b14 [Kousuke Saruta] Modified SparkContextSchedulerCreationSuite, ExecutorRunnerTest and EventLoggingListenerSuite 0325caf [Kousuke Saruta] Added ApplicationId.scala 0a2fc14 [Kousuke Saruta] Modified style eabda80 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 0f890e6 [Kousuke Saruta] Modified SparkDeploySchedulerBackend and Master to pass baseLogDir instead f eventLogDir bcf25bf [Kousuke Saruta] Modified directory name for EventLogs 28d4d93 [Kousuke Saruta] Modified SparkContext and EventLoggingListener so that the directory for EventLogs is named same for Application ID 203634e [Kousuke Saruta] Modified comment in SchedulerBackend#applicationId and TaskScheduler#applicationId 424fea4 [Kousuke Saruta] Modified the subclasses of TaskScheduler and SchedulerBackend so that they can return non-optional Unique Application ID b311806 [Kousuke Saruta] Swapped last 2 arguments passed to CoarseGrainedExecutorBackend 8a2b6ec [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 086ee25 [Kousuke Saruta] Merge branch 'metrics-structure-improvement2' of github.com:sarutak/spark into metrics-structure-improvement2 e705386 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 36d2f7a [Kousuke Saruta] Added warning message for the situation we cannot get application id for the prefix for the name of metrics eea6e19 [Kousuke Saruta] Modified CoarseGrainedMesosSchedulerBackend and MesosSchedulerBackend so that we can get Application ID c229fbe [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 e719c39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 4a93c7f [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement2 4776f9e [Kousuke Saruta] Modified MetricsSystemSuite.scala efcb6e1 [Kousuke Saruta] Modified to add application id to metrics name 2ec848a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 3ea7896 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement ead8966 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 08e627e [Kousuke Saruta] Revert "tmp" 7b67f5a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 45bd33d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 93e263a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 848819c [Kousuke Saruta] Merge branch 'metrics-structure-improvement' of github.com:sarutak/spark into metrics-structure-improvement 912a637 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement e4a4593 [Kousuke Saruta] tmp 3e098d8 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 4603a39 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement fa7175b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 15f88a3 [Kousuke Saruta] Modified MetricsSystem#buildRegistryName because conf.get does not return null when correspondin entry is absent 6f7dcd4 [Kousuke Saruta] Modified constructor of DAGSchedulerSource and BlockManagerSource because the instance of SparkContext is no longer used 6fc5560 [Kousuke Saruta] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource 4e057c9 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into metrics-structure-improvement 85ffc02 [Kousuke Saruta] Revert "Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource" 868e326 [Kousuke Saruta] Modified MetricsSystem to set registry name with unique application-id and driver/executor-id 71609f5 [Kousuke Saruta] Modified sourceName of ExecutorSource, DAGSchedulerSource and BlockManagerSource 55debab [Kousuke Saruta] Modified SparkContext and Executor to set spark.executor.id to identifiers 4180993 [Kousuke Saruta] Modified SparkContext to retain spark.unique.app.name property in SparkConf
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/executor/Executor.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala33
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala128
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala14
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala3
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala2
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala6
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala9
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala3
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala2
29 files changed, 331 insertions, 85 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 979d178c35..97109b9f41 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging {
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
+ private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
+ private[spark] val eventLogDir: Option[String] = {
+ if (isEventLogEnabled) {
+ Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
+ } else {
+ None
+ }
+ }
+
// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
@@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val listenerBus = new LiveListenerBus
// Create the Spark execution environment (cache, map output tracker, etc)
+ conf.set("spark.executor.id", "driver")
private[spark] val env = SparkEnv.create(
conf,
"<driver>",
@@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
- // Optionally log Spark events
- private[spark] val eventLogger: Option[EventLoggingListener] = {
- if (conf.getBoolean("spark.eventLog.enabled", false)) {
- val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
- logger.start()
- listenerBus.addListener(logger)
- Some(logger)
- } else None
- }
-
- // At this point, all relevant SparkListeners have been registered, so begin releasing events
- listenerBus.start()
-
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
@@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging {
// constructor
taskScheduler.start()
+ val applicationId: String = taskScheduler.applicationId()
+ conf.set("spark.app.id", applicationId)
+
+ val metricsSystem = env.metricsSystem
+
+ // The metrics system for Driver need to be set spark.app.id to app ID.
+ // So it should start after we get app ID from the task scheduler and set spark.app.id.
+ metricsSystem.start()
+
+ // Optionally log Spark events
+ private[spark] val eventLogger: Option[EventLoggingListener] = {
+ if (isEventLogEnabled) {
+ val logger =
+ new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
+ logger.start()
+ listenerBus.addListener(logger)
+ Some(logger)
+ } else None
+ }
+
+ // At this point, all relevant SparkListeners have been registered, so begin releasing events
+ listenerBus.start()
+
private[spark] val cleaner: Option[ContextCleaner] = {
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))
@@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging {
// Post init
taskScheduler.postStartHook()
- private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
- private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
+ private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
+ private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
private def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
@@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging {
private def postApplicationStart() {
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
- listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
+ listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 009ed64775..72cac42cd2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -259,11 +259,15 @@ object SparkEnv extends Logging {
}
val metricsSystem = if (isDriver) {
+ // Don't start metrics system right now for Driver.
+ // We need to wait for the task scheduler to give us an app ID.
+ // Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else {
- MetricsSystem.createMetricsSystem("executor", conf, securityManager)
+ val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
+ ms.start()
+ ms
}
- metricsSystem.start()
// Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 432b552c58..f98b531316 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
-import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState,
- SparkHadoopUtil}
+import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
+ ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
@@ -693,16 +693,18 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
- val fileSystem = Utils.getHadoopFileSystem(eventLogDir,
+
+ val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id)
+ val fileSystem = Utils.getHadoopFileSystem(appEventLogDir,
SparkHadoopUtil.get.newConfiguration(conf))
- val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
+ val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec
if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
- var msg = s"No event logs found for application $appName in $eventLogDir."
+ var msg = s"No event logs found for application $appName in $appEventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 13af5b6f58..06061edfc0 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
executorId: String,
hostname: String,
cores: Int,
+ appId: String,
workerUrl: Option[String]) {
SignalLogger.register(log)
@@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
- val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
+ val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
+ Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()
// Create a new ActorSystem using driver's Spark properties to run the backend.
@@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
def main(args: Array[String]) {
args.length match {
- case x if x < 4 =>
+ case x if x < 5 =>
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
- "<cores> [<workerUrl>]")
+ "<cores> <appid> [<workerUrl>] ")
System.exit(1)
- case 4 =>
- run(args(0), args(1), args(2), args(3).toInt, None)
- case x if x > 4 =>
- run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
+ case 5 =>
+ run(args(0), args(1), args(2), args(3).toInt, args(4), None)
+ case x if x > 5 =>
+ run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d7211ae465..9bbfcdc4a0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -74,6 +74,7 @@ private[spark] class Executor(
val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
+ conf.set("spark.executor.id", "executor." + executorId)
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
index d672158656..c4d73622c4 100644
--- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
@@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String)
override val metricRegistry = new MetricRegistry()
- // TODO: It would be nice to pass the application name here
- override val sourceName = "executor.%s".format(executorId)
+ override val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] {
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index a42c8b43bb..bca0b15226 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
- val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
+ val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
+ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
executorInfo.getExecutorId.getValue,
slaveInfo.getHostname,
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index fd316a89a1..5dd67b0cbf 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -83,10 +83,10 @@ private[spark] class MetricsSystem private (
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
metricsConfig.initialize()
- registerSources()
- registerSinks()
def start() {
+ registerSources()
+ registerSinks()
sinks.foreach(_.start)
}
@@ -98,10 +98,39 @@ private[spark] class MetricsSystem private (
sinks.foreach(_.report())
}
+ /**
+ * Build a name that uniquely identifies each metric source.
+ * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>.
+ * If either ID is not available, this defaults to just using <source name>.
+ *
+ * @param source Metric source to be named by this method.
+ * @return An unique metric name for each combination of
+ * application, executor/driver and metric source.
+ */
+ def buildRegistryName(source: Source): String = {
+ val appId = conf.getOption("spark.app.id")
+ val executorId = conf.getOption("spark.executor.id")
+ val defaultName = MetricRegistry.name(source.sourceName)
+
+ if (instance == "driver" || instance == "executor") {
+ if (appId.isDefined && executorId.isDefined) {
+ MetricRegistry.name(appId.get, executorId.get, source.sourceName)
+ } else {
+ // Only Driver and Executor are set spark.app.id and spark.executor.id.
+ // For instance, Master and Worker are not related to a specific application.
+ val warningMsg = s"Using default name $defaultName for source because %s is not set."
+ if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) }
+ if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) }
+ defaultName
+ }
+ } else { defaultName }
+ }
+
def registerSource(source: Source) {
sources += source
try {
- registry.register(source.sourceName, source.metricRegistry)
+ val regName = buildRegistryName(source)
+ registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
@@ -109,8 +138,9 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
+ val regName = buildRegistryName(source)
registry.removeMatching(new MetricFilter {
- def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
+ def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
}
@@ -125,7 +155,7 @@ private[spark] class MetricsSystem private (
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
- case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
+ case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index 94944399b1..12668b6c09 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source
-private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
- override val sourceName = "%s.DAGScheduler".format(sc.appName)
+ override val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 64b32ae0ed..100c9ba9b7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -43,25 +43,23 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
* spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
*/
private[spark] class EventLoggingListener(
- appName: String,
+ appId: String,
+ logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {
import EventLoggingListener._
- def this(appName: String, sparkConf: SparkConf) =
- this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+ def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
+ this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
- private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/")
- private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_")
- .toLowerCase + "-" + System.currentTimeMillis
- val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
-
+ val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
+ val logDirName: String = logDir.split("/").last
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
@@ -69,13 +67,6 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
/**
- * Return only the unique application directory without the base directory.
- */
- def getApplicationLogDir(): String = {
- name
- }
-
- /**
* Begin logging events.
* If compression is used, log a file that indicates which compression library is used.
*/
@@ -185,6 +176,18 @@ private[spark] object EventLoggingListener extends Logging {
}
/**
+ * Return a file-system-safe path to the log directory for the given application.
+ *
+ * @param logBaseDir A base directory for the path to the log directory for given application.
+ * @param appId A unique app ID.
+ * @return A path which consists of file-system-safe characters.
+ */
+ def getLogDirPath(logBaseDir: String, appId: String): String = {
+ val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
+ Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+ }
+
+ /**
* Parse the event logging information associated with the logs in the given directory.
*
* Specifically, this looks for event log files, the Spark version file, the compression
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index a0be8307ef..992c477493 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -23,6 +23,8 @@ package org.apache.spark.scheduler
* machines become available and can launch tasks on them.
*/
private[spark] trait SchedulerBackend {
+ private val appId = "spark-application-" + System.currentTimeMillis
+
def start(): Unit
def stop(): Unit
def reviveOffers(): Unit
@@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend {
def isReady(): Boolean = true
/**
- * The application ID associated with the job, if any.
+ * Get an application ID associated with the job.
*
- * @return The application ID, or None if the backend does not provide an ID.
+ * @return An application ID
*/
- def applicationId(): Option[String] = None
+ def applicationId(): String = appId
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1c1ce666ea..a129a434c9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -31,6 +31,8 @@ import org.apache.spark.storage.BlockManagerId
*/
private[spark] trait TaskScheduler {
+ private val appId = "spark-application-" + System.currentTimeMillis
+
def rootPool: Pool
def schedulingMode: SchedulingMode
@@ -66,10 +68,10 @@ private[spark] trait TaskScheduler {
blockManagerId: BlockManagerId): Boolean
/**
- * The application ID associated with the job, if any.
+ * Get an application ID associated with the job.
*
- * @return The application ID, or None if the backend does not provide an ID.
+ * @return An application ID
*/
- def applicationId(): Option[String] = None
+ def applicationId(): String = appId
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 633e892554..4dc550413c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -492,7 +492,7 @@ private[spark] class TaskSchedulerImpl(
}
}
- override def applicationId(): Option[String] = backend.applicationId()
+ override def applicationId(): String = backend.applicationId()
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 5c5ecc8434..ed209d195e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -68,9 +68,8 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
- val eventLogDir = sc.eventLogger.map(_.logDir)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- appUIAddress, eventLogDir)
+ appUIAddress, sc.eventLogDir)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
@@ -129,7 +128,11 @@ private[spark] class SparkDeploySchedulerBackend(
totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
}
- override def applicationId(): Option[String] = Option(appId)
+ override def applicationId(): String =
+ Option(appId).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
private def waitForRegistration() = {
registrationLock.synchronized {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 3161f1ee9f..90828578cd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -76,6 +76,8 @@ private[spark] class CoarseMesosSchedulerBackend(
var nextMesosTaskId = 0
+ @volatile var appId: String = _
+
def newMesosTaskId(): Int = {
val id = nextMesosTaskId
nextMesosTaskId += 1
@@ -167,7 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend(
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
+ appId = frameworkId.getValue
+ logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
@@ -313,4 +316,10 @@ private[spark] class CoarseMesosSchedulerBackend(
slaveLost(d, s)
}
+ override def applicationId(): String =
+ Option(appId).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 4c49aa074e..b11786368e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -30,7 +30,7 @@ import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
-import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
/**
@@ -62,6 +62,8 @@ private[spark] class MesosSchedulerBackend(
var classLoader: ClassLoader = null
+ @volatile var appId: String = _
+
override def start() {
synchronized {
classLoader = Thread.currentThread.getContextClassLoader
@@ -177,7 +179,8 @@ private[spark] class MesosSchedulerBackend(
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
val oldClassLoader = setClassLoader()
try {
- logInfo("Registered as framework ID " + frameworkId.getValue)
+ appId = frameworkId.getValue
+ logInfo("Registered as framework ID " + appId)
registeredLock.synchronized {
isRegistered = true
registeredLock.notifyAll()
@@ -372,4 +375,10 @@ private[spark] class MesosSchedulerBackend(
// TODO: query Mesos for number of cores
override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+ override def applicationId(): String =
+ Option(appId).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 9ea25c2bc7..58b78f041c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -88,6 +88,7 @@ private[spark] class LocalActor(
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
extends SchedulerBackend with ExecutorBackend {
+ private val appId = "local-" + System.currentTimeMillis
var localActor: ActorRef = null
override def start() {
@@ -115,4 +116,6 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
localActor ! StatusUpdate(taskId, state, serializedData)
}
+ override def applicationId(): String = appId
+
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
index 49fea6d9e2..8569c6f3cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala
@@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.SparkContext
import org.apache.spark.metrics.source.Source
-private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
+private[spark] class BlockManagerSource(val blockManager: BlockManager)
extends Source {
override val metricRegistry = new MetricRegistry()
- override val sourceName = "%s.BlockManager".format(sc.appName)
+ override val sourceName = "BlockManager"
metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] {
override def getValue: Long = {
diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
index e42b181194..3925f0ccbd 100644
--- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
+++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala
@@ -17,14 +17,15 @@
package org.apache.spark.metrics
-import org.apache.spark.metrics.source.Source
import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.master.MasterSource
+import org.apache.spark.metrics.source.Source
-import scala.collection.mutable.ArrayBuffer
+import com.codahale.metrics.MetricRegistry
+import scala.collection.mutable.ArrayBuffer
class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{
var filePath: String = _
@@ -39,6 +40,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr)
+ metricsSystem.start()
val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
@@ -49,6 +51,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr)
+ metricsSystem.start()
val sources = PrivateMethod[ArrayBuffer[Source]]('sources)
val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks)
@@ -60,4 +63,125 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod
metricsSystem.registerSource(source)
assert(metricsSystem.invokePrivate(sources()).length === 1)
}
+
+ test("MetricsSystem with Driver instance") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val executorId = "driver"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "driver"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === s"$appId.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Driver instance and spark.app.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val executorId = "driver"
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "driver"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with Driver instance and spark.executor.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ conf.set("spark.app.id", appId)
+
+ val instanceName = "driver"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with Executor instance") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val executorId = "executor.1"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === s"$appId.$executorId.${source.sourceName}")
+ }
+
+ test("MetricsSystem with Executor instance and spark.app.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val executorId = "executor.1"
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with Executor instance and spark.executor.id is not set") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ conf.set("spark.app.id", appId)
+
+ val instanceName = "executor"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+ assert(metricName === source.sourceName)
+ }
+
+ test("MetricsSystem with instance which is neither Driver nor Executor") {
+ val source = new Source {
+ override val sourceName = "dummySource"
+ override val metricRegistry = new MetricRegistry()
+ }
+
+ val appId = "testId"
+ val executorId = "dummyExecutorId"
+ conf.set("spark.app.id", appId)
+ conf.set("spark.executor.id", executorId)
+
+ val instanceName = "testInstance"
+ val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr)
+
+ val metricName = driverMetricsSystem.buildRegistryName(source)
+
+ // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name.
+ assert(metricName != s"$appId.$executorId.${source.sourceName}")
+ assert(metricName === source.sourceName)
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index e5315bc93e..3efa854318 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -169,7 +169,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify logging directory exists
val conf = getLoggingConf(logDirPath, compressionCodec)
- val eventLogger = new EventLoggingListener("test", conf)
+ val logBaseDir = conf.get("spark.eventLog.dir")
+ val appId = EventLoggingListenerSuite.getUniqueApplicationId
+ val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
eventLogger.start()
val logPath = new Path(eventLogger.logDir)
assert(fileSystem.exists(logPath))
@@ -209,7 +211,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
// Verify that all information is correctly parsed before stop()
val conf = getLoggingConf(logDirPath, compressionCodec)
- val eventLogger = new EventLoggingListener("test", conf)
+ val logBaseDir = conf.get("spark.eventLog.dir")
+ val appId = EventLoggingListenerSuite.getUniqueApplicationId
+ val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
eventLogger.start()
var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem)
assertInfoCorrect(eventLoggingInfo, loggerStopped = false)
@@ -228,7 +232,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
*/
private def testEventLogging(compressionCodec: Option[String] = None) {
val conf = getLoggingConf(logDirPath, compressionCodec)
- val eventLogger = new EventLoggingListener("test", conf)
+ val logBaseDir = conf.get("spark.eventLog.dir")
+ val appId = EventLoggingListenerSuite.getUniqueApplicationId
+ val eventLogger = new EventLoggingListener(appId, logBaseDir, conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey")
@@ -408,4 +414,6 @@ object EventLoggingListenerSuite {
}
conf
}
+
+ def getUniqueApplicationId = "test-" + System.currentTimeMillis
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 7ab351d1b4..48114feee6 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -155,7 +155,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
* This child listener inherits only the event buffering functionality, but does not actually
* log the events.
*/
- private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) {
+ private class EventMonster(conf: SparkConf)
+ extends EventLoggingListener("test", "testdir", conf) {
logger.close()
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
index 99c8d13231..eb6e88cf55 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.nio.ByteBuffer
+import java.util.concurrent.Semaphore
import scala.collection.mutable.ArrayBuffer
@@ -36,6 +37,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
val receiver = new FakeReceiver
val executor = new FakeReceiverSupervisor(receiver)
+ val executorStarted = new Semaphore(0)
assert(executor.isAllEmpty)
@@ -43,6 +45,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
val executingThread = new Thread() {
override def run() {
executor.start()
+ executorStarted.release(1)
executor.awaitTermination()
}
}
@@ -57,6 +60,9 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
}
}
+ // Ensure executor is started
+ executorStarted.acquire()
+
// Verify that receiver was started
assert(receiver.onStartCalled)
assert(executor.isReceiverStarted)
@@ -186,10 +192,10 @@ class NetworkReceiverSuite extends FunSuite with Timeouts {
* An implementation of NetworkReceiver that is used for testing a receiver's life cycle.
*/
class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
- var otherThread: Thread = null
- var receiving = false
- var onStartCalled = false
- var onStopCalled = false
+ @volatile var otherThread: Thread = null
+ @volatile var receiving = false
+ @volatile var onStartCalled = false
+ @volatile var onStopCalled = false
def onStart() {
otherThread = new Thread() {
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 10cbeb8b94..229b7a09f4 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -47,6 +47,7 @@ class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
+ appAttemptId: String,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {
@@ -83,7 +84,7 @@ class ExecutorRunnable(
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- localResources)
+ appAttemptId, localResources)
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
index d7a7175d5e..5cb4753de2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala
@@ -43,6 +43,7 @@ trait ExecutorRunnableUtil extends Logging {
hostname: String,
executorMemory: Int,
executorCores: Int,
+ appId: String,
localResources: HashMap[String, LocalResource]): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()
@@ -114,6 +115,7 @@ trait ExecutorRunnableUtil extends Logging {
slaveId.toString,
hostname.toString,
executorCores.toString,
+ appId,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 4f4f1d2aaa..e1af8d5a74 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -57,6 +57,7 @@ object AllocationType extends Enumeration {
private[yarn] abstract class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
+ appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
@@ -295,6 +296,7 @@ private[yarn] abstract class YarnAllocator(
executorHostname,
executorMemory,
executorCores,
+ appAttemptId.getApplicationId.toString,
securityMgr)
launcherPool.execute(executorRunnable)
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 200a308992..6bb4b82316 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -155,6 +155,10 @@ private[spark] class YarnClientSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
- override def applicationId(): Option[String] = Option(appId).map(_.toString())
+ override def applicationId(): String =
+ Option(appId).map(_.toString).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 39436d0999..3a186cfeb4 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -48,6 +48,13 @@ private[spark] class YarnClusterSchedulerBackend(
totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
}
- override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
+ override def applicationId(): String =
+ // In YARN Cluster mode, spark.yarn.app.id is expect to be set
+ // before user application is launched.
+ // So, if spark.yarn.app.id is not set, it is something wrong.
+ sc.getConf.getOption("spark.yarn.app.id").getOrElse {
+ logError("Application ID is not set.")
+ super.applicationId
+ }
}
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 833be12982..0b5a92d87d 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -47,6 +47,7 @@ class ExecutorRunnable(
hostname: String,
executorMemory: Int,
executorCores: Int,
+ appId: String,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {
@@ -80,7 +81,7 @@ class ExecutorRunnable(
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
- localResources)
+ appId, localResources)
logInfo(s"Setting up executor with environment: $env")
logInfo("Setting up executor with commands: " + commands)
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e44a8db41b..2bbf5d7db8 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -41,7 +41,7 @@ private[yarn] class YarnAllocationHandler(
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
- extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
+ extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
override protected def releaseContainer(container: Container) = {
amClient.releaseAssignedContainer(container.getId())