diff options
29 files changed, 331 insertions, 85 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 979d178c35..97109b9f41 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false) + private[spark] val eventLogDir: Option[String] = { + if (isEventLogEnabled) { + Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")) + } else { + None + } + } + // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe val tachyonFolderName = "spark-" + randomUUID.toString() @@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val listenerBus = new LiveListenerBus // Create the Spark execution environment (cache, map output tracker, etc) + conf.set("spark.executor.id", "driver") private[spark] val env = SparkEnv.create( conf, "<driver>", @@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging { /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) - // Optionally log Spark events - private[spark] val eventLogger: Option[EventLoggingListener] = { - if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf, hadoopConfiguration) - logger.start() - listenerBus.addListener(logger) - Some(logger) - } else None - } - - // At this point, all relevant SparkListeners have been registered, so begin releasing events - listenerBus.start() - val startTime = System.currentTimeMillis() // Add each JAR given through the constructor @@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging { // constructor taskScheduler.start() + val applicationId: String = taskScheduler.applicationId() + conf.set("spark.app.id", applicationId) + + val metricsSystem = env.metricsSystem + + // The metrics system for Driver need to be set spark.app.id to app ID. + // So it should start after we get app ID from the task scheduler and set spark.app.id. + metricsSystem.start() + + // Optionally log Spark events + private[spark] val eventLogger: Option[EventLoggingListener] = { + if (isEventLogEnabled) { + val logger = + new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration) + logger.start() + listenerBus.addListener(logger) + Some(logger) + } else None + } + + // At this point, all relevant SparkListeners have been registered, so begin releasing events + listenerBus.start() + private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) @@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging { // Post init taskScheduler.postStartHook() - private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) - private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) + private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) + private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) @@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging { private def postApplicationStart() { // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). - listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(), + listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser)) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 009ed64775..72cac42cd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -259,11 +259,15 @@ object SparkEnv extends Logging { } val metricsSystem = if (isDriver) { + // Don't start metrics system right now for Driver. + // We need to wait for the task scheduler to give us an app ID. + // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager) } else { - MetricsSystem.createMetricsSystem("executor", conf, securityManager) + val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) + ms.start() + ms } - metricsSystem.start() // Set the sparkFiles directory, used when downloading dependencies. In local mode, // this is a temporary directory; in distributed mode, this is the executor's current working diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 432b552c58..f98b531316 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, - SparkHadoopUtil} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, + ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState @@ -693,16 +693,18 @@ private[spark] class Master( app.desc.appUiUrl = notFoundBasePath return false } - val fileSystem = Utils.getHadoopFileSystem(eventLogDir, + + val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id) + val fileSystem = Utils.getHadoopFileSystem(appEventLogDir, SparkHadoopUtil.get.newConfiguration(conf)) - val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem) + val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem) val eventLogPaths = eventLogInfo.logPaths val compressionCodec = eventLogInfo.compressionCodec if (eventLogPaths.isEmpty) { // Event logging is enabled for this application, but no event logs are found val title = s"Application history not found (${app.id})" - var msg = s"No event logs found for application $appName in $eventLogDir." + var msg = s"No event logs found for application $appName in $appEventLogDir." logWarning(msg) msg += " Did you specify the correct logging directory?" msg = URLEncoder.encode(msg, "UTF-8") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 13af5b6f58..06061edfc0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { executorId: String, hostname: String, cores: Int, + appId: String, workerUrl: Option[String]) { SignalLogger.register(log) @@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val driver = fetcher.actorSelection(driverUrl) val timeout = AkkaUtils.askTimeout(executorConf) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) - val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] + val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++ + Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create a new ActorSystem using driver's Spark properties to run the backend. @@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def main(args: Array[String]) { args.length match { - case x if x < 4 => + case x if x < 5 => System.err.println( // Worker url is used in spark standalone mode to enforce fate-sharing with worker "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " + - "<cores> [<workerUrl>]") + "<cores> <appid> [<workerUrl>] ") System.exit(1) - case 4 => - run(args(0), args(1), args(2), args(3).toInt, None) - case x if x > 4 => - run(args(0), args(1), args(2), args(3).toInt, Some(args(4))) + case 5 => + run(args(0), args(1), args(2), args(3).toInt, args(4), None) + case x if x > 5 => + run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5))) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d7211ae465..9bbfcdc4a0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -74,6 +74,7 @@ private[spark] class Executor( val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) + conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d672158656..c4d73622c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String) override val metricRegistry = new MetricRegistry() - // TODO: It would be nice to pass the application name here - override val sourceName = "executor.%s".format(executorId) + override val sourceName = "executor" // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index a42c8b43bb..bca0b15226 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend slaveInfo: SlaveInfo) { logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver - val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) + val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ + Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) executor = new Executor( executorInfo.getExecutorId.getValue, slaveInfo.getHostname, diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index fd316a89a1..5dd67b0cbf 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -83,10 +83,10 @@ private[spark] class MetricsSystem private ( def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array()) metricsConfig.initialize() - registerSources() - registerSinks() def start() { + registerSources() + registerSinks() sinks.foreach(_.start) } @@ -98,10 +98,39 @@ private[spark] class MetricsSystem private ( sinks.foreach(_.report()) } + /** + * Build a name that uniquely identifies each metric source. + * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>. + * If either ID is not available, this defaults to just using <source name>. + * + * @param source Metric source to be named by this method. + * @return An unique metric name for each combination of + * application, executor/driver and metric source. + */ + def buildRegistryName(source: Source): String = { + val appId = conf.getOption("spark.app.id") + val executorId = conf.getOption("spark.executor.id") + val defaultName = MetricRegistry.name(source.sourceName) + + if (instance == "driver" || instance == "executor") { + if (appId.isDefined && executorId.isDefined) { + MetricRegistry.name(appId.get, executorId.get, source.sourceName) + } else { + // Only Driver and Executor are set spark.app.id and spark.executor.id. + // For instance, Master and Worker are not related to a specific application. + val warningMsg = s"Using default name $defaultName for source because %s is not set." + if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) } + if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) } + defaultName + } + } else { defaultName } + } + def registerSource(source: Source) { sources += source try { - registry.register(source.sourceName, source.metricRegistry) + val regName = buildRegistryName(source) + registry.register(regName, source.metricRegistry) } catch { case e: IllegalArgumentException => logInfo("Metrics already registered", e) } @@ -109,8 +138,9 @@ private[spark] class MetricsSystem private ( def removeSource(source: Source) { sources -= source + val regName = buildRegistryName(source) registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName) + def matches(name: String, metric: Metric): Boolean = name.startsWith(regName) }) } @@ -125,7 +155,7 @@ private[spark] class MetricsSystem private ( val source = Class.forName(classPath).newInstance() registerSource(source.asInstanceOf[Source]) } catch { - case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e) + case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 94944399b1..12668b6c09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { override val metricRegistry = new MetricRegistry() - override val sourceName = "%s.DAGScheduler".format(sc.appName) + override val sourceName = "DAGScheduler" metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 64b32ae0ed..100c9ba9b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -43,25 +43,23 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils} * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams */ private[spark] class EventLoggingListener( - appName: String, + appId: String, + logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) extends SparkListener with Logging { import EventLoggingListener._ - def this(appName: String, sparkConf: SparkConf) = - this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) + def this(appId: String, logBaseDir: String, sparkConf: SparkConf) = + this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") - private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_") - .toLowerCase + "-" + System.currentTimeMillis - val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") - + val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId) + val logDirName: String = logDir.split("/").last protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) @@ -69,13 +67,6 @@ private[spark] class EventLoggingListener( private[scheduler] val loggedEvents = new ArrayBuffer[JValue] /** - * Return only the unique application directory without the base directory. - */ - def getApplicationLogDir(): String = { - name - } - - /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. */ @@ -185,6 +176,18 @@ private[spark] object EventLoggingListener extends Logging { } /** + * Return a file-system-safe path to the log directory for the given application. + * + * @param logBaseDir A base directory for the path to the log directory for given application. + * @param appId A unique app ID. + * @return A path which consists of file-system-safe characters. + */ + def getLogDirPath(logBaseDir: String, appId: String): String = { + val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase + Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + } + + /** * Parse the event logging information associated with the logs in the given directory. * * Specifically, this looks for event log files, the Spark version file, the compression diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index a0be8307ef..992c477493 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -23,6 +23,8 @@ package org.apache.spark.scheduler * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { + private val appId = "spark-application-" + System.currentTimeMillis + def start(): Unit def stop(): Unit def reviveOffers(): Unit @@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend { def isReady(): Boolean = true /** - * The application ID associated with the job, if any. + * Get an application ID associated with the job. * - * @return The application ID, or None if the backend does not provide an ID. + * @return An application ID */ - def applicationId(): Option[String] = None + def applicationId(): String = appId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1c1ce666ea..a129a434c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -31,6 +31,8 @@ import org.apache.spark.storage.BlockManagerId */ private[spark] trait TaskScheduler { + private val appId = "spark-application-" + System.currentTimeMillis + def rootPool: Pool def schedulingMode: SchedulingMode @@ -66,10 +68,10 @@ private[spark] trait TaskScheduler { blockManagerId: BlockManagerId): Boolean /** - * The application ID associated with the job, if any. + * Get an application ID associated with the job. * - * @return The application ID, or None if the backend does not provide an ID. + * @return An application ID */ - def applicationId(): Option[String] = None + def applicationId(): String = appId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 633e892554..4dc550413c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -492,7 +492,7 @@ private[spark] class TaskSchedulerImpl( } } - override def applicationId(): Option[String] = backend.applicationId() + override def applicationId(): String = backend.applicationId() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 5c5ecc8434..ed209d195e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -68,9 +68,8 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - val eventLogDir = sc.eventLogger.map(_.logDir) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, eventLogDir) + appUIAddress, sc.eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() @@ -129,7 +128,11 @@ private[spark] class SparkDeploySchedulerBackend( totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } - override def applicationId(): Option[String] = Option(appId) + override def applicationId(): String = + Option(appId).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } private def waitForRegistration() = { registrationLock.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 3161f1ee9f..90828578cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -76,6 +76,8 @@ private[spark] class CoarseMesosSchedulerBackend( var nextMesosTaskId = 0 + @volatile var appId: String = _ + def newMesosTaskId(): Int = { val id = nextMesosTaskId nextMesosTaskId += 1 @@ -167,7 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend( override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - logInfo("Registered as framework ID " + frameworkId.getValue) + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -313,4 +316,10 @@ private[spark] class CoarseMesosSchedulerBackend( slaveLost(d, s) } + override def applicationId(): String = + Option(appId).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 4c49aa074e..b11786368e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler._ import org.apache.spark.util.Utils /** @@ -62,6 +62,8 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + @volatile var appId: String = _ + override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader @@ -177,7 +179,8 @@ private[spark] class MesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { val oldClassLoader = setClassLoader() try { - logInfo("Registered as framework ID " + frameworkId.getValue) + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -372,4 +375,10 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) + override def applicationId(): String = + Option(appId).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 9ea25c2bc7..58b78f041c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -88,6 +88,7 @@ private[spark] class LocalActor( private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) extends SchedulerBackend with ExecutorBackend { + private val appId = "local-" + System.currentTimeMillis var localActor: ActorRef = null override def start() { @@ -115,4 +116,6 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: localActor ! StatusUpdate(taskId, state, serializedData) } + override def applicationId(): String = appId + } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 49fea6d9e2..8569c6f3cb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source -private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) +private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { override val metricRegistry = new MetricRegistry() - override val sourceName = "%s.BlockManager".format(sc.appName) + override val sourceName = "BlockManager" metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala index e42b181194..3925f0ccbd 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala @@ -17,14 +17,15 @@ package org.apache.spark.metrics -import org.apache.spark.metrics.source.Source import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester} import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.master.MasterSource +import org.apache.spark.metrics.source.Source -import scala.collection.mutable.ArrayBuffer +import com.codahale.metrics.MetricRegistry +import scala.collection.mutable.ArrayBuffer class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester{ var filePath: String = _ @@ -39,6 +40,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod test("MetricsSystem with default config") { val metricsSystem = MetricsSystem.createMetricsSystem("default", conf, securityMgr) + metricsSystem.start() val sources = PrivateMethod[ArrayBuffer[Source]]('sources) val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) @@ -49,6 +51,7 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod test("MetricsSystem with sources add") { val metricsSystem = MetricsSystem.createMetricsSystem("test", conf, securityMgr) + metricsSystem.start() val sources = PrivateMethod[ArrayBuffer[Source]]('sources) val sinks = PrivateMethod[ArrayBuffer[Source]]('sinks) @@ -60,4 +63,125 @@ class MetricsSystemSuite extends FunSuite with BeforeAndAfter with PrivateMethod metricsSystem.registerSource(source) assert(metricsSystem.invokePrivate(sources()).length === 1) } + + test("MetricsSystem with Driver instance") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val executorId = "driver" + conf.set("spark.app.id", appId) + conf.set("spark.executor.id", executorId) + + val instanceName = "driver" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$appId.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Driver instance and spark.app.id is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val executorId = "driver" + conf.set("spark.executor.id", executorId) + + val instanceName = "driver" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with Driver instance and spark.executor.id is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + conf.set("spark.app.id", appId) + + val instanceName = "driver" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with Executor instance") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val executorId = "executor.1" + conf.set("spark.app.id", appId) + conf.set("spark.executor.id", executorId) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === s"$appId.$executorId.${source.sourceName}") + } + + test("MetricsSystem with Executor instance and spark.app.id is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val executorId = "executor.1" + conf.set("spark.executor.id", executorId) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with Executor instance and spark.executor.id is not set") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + conf.set("spark.app.id", appId) + + val instanceName = "executor" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + assert(metricName === source.sourceName) + } + + test("MetricsSystem with instance which is neither Driver nor Executor") { + val source = new Source { + override val sourceName = "dummySource" + override val metricRegistry = new MetricRegistry() + } + + val appId = "testId" + val executorId = "dummyExecutorId" + conf.set("spark.app.id", appId) + conf.set("spark.executor.id", executorId) + + val instanceName = "testInstance" + val driverMetricsSystem = MetricsSystem.createMetricsSystem(instanceName, conf, securityMgr) + + val metricName = driverMetricsSystem.buildRegistryName(source) + + // Even if spark.app.id and spark.executor.id are set, they are not used for the metric name. + assert(metricName != s"$appId.$executorId.${source.sourceName}") + assert(metricName === source.sourceName) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index e5315bc93e..3efa854318 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -169,7 +169,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify logging directory exists val conf = getLoggingConf(logDirPath, compressionCodec) - val eventLogger = new EventLoggingListener("test", conf) + val logBaseDir = conf.get("spark.eventLog.dir") + val appId = EventLoggingListenerSuite.getUniqueApplicationId + val eventLogger = new EventLoggingListener(appId, logBaseDir, conf) eventLogger.start() val logPath = new Path(eventLogger.logDir) assert(fileSystem.exists(logPath)) @@ -209,7 +211,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { // Verify that all information is correctly parsed before stop() val conf = getLoggingConf(logDirPath, compressionCodec) - val eventLogger = new EventLoggingListener("test", conf) + val logBaseDir = conf.get("spark.eventLog.dir") + val appId = EventLoggingListenerSuite.getUniqueApplicationId + val eventLogger = new EventLoggingListener(appId, logBaseDir, conf) eventLogger.start() var eventLoggingInfo = EventLoggingListener.parseLoggingInfo(eventLogger.logDir, fileSystem) assertInfoCorrect(eventLoggingInfo, loggerStopped = false) @@ -228,7 +232,9 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter { */ private def testEventLogging(compressionCodec: Option[String] = None) { val conf = getLoggingConf(logDirPath, compressionCodec) - val eventLogger = new EventLoggingListener("test", conf) + val logBaseDir = conf.get("spark.eventLog.dir") + val appId = EventLoggingListenerSuite.getUniqueApplicationId + val eventLogger = new EventLoggingListener(appId, logBaseDir, conf) val listenerBus = new LiveListenerBus val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey") @@ -408,4 +414,6 @@ object EventLoggingListenerSuite { } conf } + + def getUniqueApplicationId = "test-" + System.currentTimeMillis } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 7ab351d1b4..48114feee6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -155,7 +155,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter { * This child listener inherits only the event buffering functionality, but does not actually * log the events. */ - private class EventMonster(conf: SparkConf) extends EventLoggingListener("test", conf) { + private class EventMonster(conf: SparkConf) + extends EventLoggingListener("test", "testdir", conf) { logger.close() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala index 99c8d13231..eb6e88cf55 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/NetworkReceiverSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming import java.nio.ByteBuffer +import java.util.concurrent.Semaphore import scala.collection.mutable.ArrayBuffer @@ -36,6 +37,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { val receiver = new FakeReceiver val executor = new FakeReceiverSupervisor(receiver) + val executorStarted = new Semaphore(0) assert(executor.isAllEmpty) @@ -43,6 +45,7 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { val executingThread = new Thread() { override def run() { executor.start() + executorStarted.release(1) executor.awaitTermination() } } @@ -57,6 +60,9 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { } } + // Ensure executor is started + executorStarted.acquire() + // Verify that receiver was started assert(receiver.onStartCalled) assert(executor.isReceiverStarted) @@ -186,10 +192,10 @@ class NetworkReceiverSuite extends FunSuite with Timeouts { * An implementation of NetworkReceiver that is used for testing a receiver's life cycle. */ class FakeReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { - var otherThread: Thread = null - var receiving = false - var onStartCalled = false - var onStopCalled = false + @volatile var otherThread: Thread = null + @volatile var receiving = false + @volatile var onStartCalled = false + @volatile var onStopCalled = false def onStart() { otherThread = new Thread() { diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 10cbeb8b94..229b7a09f4 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -47,6 +47,7 @@ class ExecutorRunnable( hostname: String, executorMemory: Int, executorCores: Int, + appAttemptId: String, securityMgr: SecurityManager) extends Runnable with ExecutorRunnableUtil with Logging { @@ -83,7 +84,7 @@ class ExecutorRunnable( ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources) + appAttemptId, localResources) logInfo("Setting up executor with commands: " + commands) ctx.setCommands(commands) diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index d7a7175d5e..5cb4753de2 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -43,6 +43,7 @@ trait ExecutorRunnableUtil extends Logging { hostname: String, executorMemory: Int, executorCores: Int, + appId: String, localResources: HashMap[String, LocalResource]): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() @@ -114,6 +115,7 @@ trait ExecutorRunnableUtil extends Logging { slaveId.toString, hostname.toString, executorCores.toString, + appId, "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 4f4f1d2aaa..e1af8d5a74 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -57,6 +57,7 @@ object AllocationType extends Enumeration { private[yarn] abstract class YarnAllocator( conf: Configuration, sparkConf: SparkConf, + appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, preferredNodes: collection.Map[String, collection.Set[SplitInfo]], securityMgr: SecurityManager) @@ -295,6 +296,7 @@ private[yarn] abstract class YarnAllocator( executorHostname, executorMemory, executorCores, + appAttemptId.getApplicationId.toString, securityMgr) launcherPool.execute(executorRunnable) } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 200a308992..6bb4b82316 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -155,6 +155,10 @@ private[spark] class YarnClientSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } - override def applicationId(): Option[String] = Option(appId).map(_.toString()) + override def applicationId(): String = + Option(appId).map(_.toString).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } } diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 39436d0999..3a186cfeb4 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -48,6 +48,13 @@ private[spark] class YarnClusterSchedulerBackend( totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } - override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id") + override def applicationId(): String = + // In YARN Cluster mode, spark.yarn.app.id is expect to be set + // before user application is launched. + // So, if spark.yarn.app.id is not set, it is something wrong. + sc.getConf.getOption("spark.yarn.app.id").getOrElse { + logError("Application ID is not set.") + super.applicationId + } } diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 833be12982..0b5a92d87d 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -47,6 +47,7 @@ class ExecutorRunnable( hostname: String, executorMemory: Int, executorCores: Int, + appId: String, securityMgr: SecurityManager) extends Runnable with ExecutorRunnableUtil with Logging { @@ -80,7 +81,7 @@ class ExecutorRunnable( ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - localResources) + appId, localResources) logInfo(s"Setting up executor with environment: $env") logInfo("Setting up executor with commands: " + commands) diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index e44a8db41b..2bbf5d7db8 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -41,7 +41,7 @@ private[yarn] class YarnAllocationHandler( args: ApplicationMasterArguments, preferredNodes: collection.Map[String, collection.Set[SplitInfo]], securityMgr: SecurityManager) - extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) { + extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) { override protected def releaseContainer(container: Container) = { amClient.releaseAssignedContainer(container.getId()) |