diff options
Diffstat (limited to 'core/src/main')
18 files changed, 160 insertions, 70 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 979d178c35..97109b9f41 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -187,6 +187,15 @@ class SparkContext(config: SparkConf) extends Logging { val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false) + private[spark] val eventLogDir: Option[String] = { + if (isEventLogEnabled) { + Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/")) + } else { + None + } + } + // Generate the random name for a temp folder in Tachyon // Add a timestamp as the suffix here to make it more safe val tachyonFolderName = "spark-" + randomUUID.toString() @@ -200,6 +209,7 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val listenerBus = new LiveListenerBus // Create the Spark execution environment (cache, map output tracker, etc) + conf.set("spark.executor.id", "driver") private[spark] val env = SparkEnv.create( conf, "<driver>", @@ -232,19 +242,6 @@ class SparkContext(config: SparkConf) extends Logging { /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf) - // Optionally log Spark events - private[spark] val eventLogger: Option[EventLoggingListener] = { - if (conf.getBoolean("spark.eventLog.enabled", false)) { - val logger = new EventLoggingListener(appName, conf, hadoopConfiguration) - logger.start() - listenerBus.addListener(logger) - Some(logger) - } else None - } - - // At this point, all relevant SparkListeners have been registered, so begin releasing events - listenerBus.start() - val startTime = System.currentTimeMillis() // Add each JAR given through the constructor @@ -309,6 +306,29 @@ class SparkContext(config: SparkConf) extends Logging { // constructor taskScheduler.start() + val applicationId: String = taskScheduler.applicationId() + conf.set("spark.app.id", applicationId) + + val metricsSystem = env.metricsSystem + + // The metrics system for Driver need to be set spark.app.id to app ID. + // So it should start after we get app ID from the task scheduler and set spark.app.id. + metricsSystem.start() + + // Optionally log Spark events + private[spark] val eventLogger: Option[EventLoggingListener] = { + if (isEventLogEnabled) { + val logger = + new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration) + logger.start() + listenerBus.addListener(logger) + Some(logger) + } else None + } + + // At this point, all relevant SparkListeners have been registered, so begin releasing events + listenerBus.start() + private[spark] val cleaner: Option[ContextCleaner] = { if (conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) @@ -411,8 +431,8 @@ class SparkContext(config: SparkConf) extends Logging { // Post init taskScheduler.postStartHook() - private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) - private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) + private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) + private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) @@ -1278,7 +1298,7 @@ class SparkContext(config: SparkConf) extends Logging { private def postApplicationStart() { // Note: this code assumes that the task scheduler has been initialized and has contacted // the cluster manager to get an application ID (in case the cluster manager provides one). - listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(), + listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId), startTime, sparkUser)) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 009ed64775..72cac42cd2 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -259,11 +259,15 @@ object SparkEnv extends Logging { } val metricsSystem = if (isDriver) { + // Don't start metrics system right now for Driver. + // We need to wait for the task scheduler to give us an app ID. + // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager) } else { - MetricsSystem.createMetricsSystem("executor", conf, securityManager) + val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) + ms.start() + ms } - metricsSystem.start() // Set the sparkFiles directory, used when downloading dependencies. In local mode, // this is a temporary directory; in distributed mode, this is the executor's current working diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 432b552c58..f98b531316 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -33,8 +33,8 @@ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState, - SparkHadoopUtil} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, + ExecutorState, SparkHadoopUtil} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.deploy.master.DriverState.DriverState @@ -693,16 +693,18 @@ private[spark] class Master( app.desc.appUiUrl = notFoundBasePath return false } - val fileSystem = Utils.getHadoopFileSystem(eventLogDir, + + val appEventLogDir = EventLoggingListener.getLogDirPath(eventLogDir, app.id) + val fileSystem = Utils.getHadoopFileSystem(appEventLogDir, SparkHadoopUtil.get.newConfiguration(conf)) - val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem) + val eventLogInfo = EventLoggingListener.parseLoggingInfo(appEventLogDir, fileSystem) val eventLogPaths = eventLogInfo.logPaths val compressionCodec = eventLogInfo.compressionCodec if (eventLogPaths.isEmpty) { // Event logging is enabled for this application, but no event logs are found val title = s"Application history not found (${app.id})" - var msg = s"No event logs found for application $appName in $eventLogDir." + var msg = s"No event logs found for application $appName in $appEventLogDir." logWarning(msg) msg += " Did you specify the correct logging directory?" msg = URLEncoder.encode(msg, "UTF-8") diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 13af5b6f58..06061edfc0 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -106,6 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { executorId: String, hostname: String, cores: Int, + appId: String, workerUrl: Option[String]) { SignalLogger.register(log) @@ -122,7 +123,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val driver = fetcher.actorSelection(driverUrl) val timeout = AkkaUtils.askTimeout(executorConf) val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) - val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] + val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++ + Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create a new ActorSystem using driver's Spark properties to run the backend. @@ -144,16 +146,16 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { def main(args: Array[String]) { args.length match { - case x if x < 4 => + case x if x < 5 => System.err.println( // Worker url is used in spark standalone mode to enforce fate-sharing with worker "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " + - "<cores> [<workerUrl>]") + "<cores> <appid> [<workerUrl>] ") System.exit(1) - case 4 => - run(args(0), args(1), args(2), args(3).toInt, None) - case x if x > 4 => - run(args(0), args(1), args(2), args(3).toInt, Some(args(4))) + case 5 => + run(args(0), args(1), args(2), args(3).toInt, args(4), None) + case x if x > 5 => + run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5))) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d7211ae465..9bbfcdc4a0 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -74,6 +74,7 @@ private[spark] class Executor( val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) + conf.set("spark.executor.id", "executor." + executorId) private val env = { if (!isLocal) { val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index d672158656..c4d73622c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -37,8 +37,7 @@ private[spark] class ExecutorSource(val executor: Executor, executorId: String) override val metricRegistry = new MetricRegistry() - // TODO: It would be nice to pass the application name here - override val sourceName = "executor.%s".format(executorId) + override val sourceName = "executor" // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTasks"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index a42c8b43bb..bca0b15226 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -52,7 +52,8 @@ private[spark] class MesosExecutorBackend slaveInfo: SlaveInfo) { logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue) this.driver = driver - val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) + val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ + Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) executor = new Executor( executorInfo.getExecutorId.getValue, slaveInfo.getHostname, diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index fd316a89a1..5dd67b0cbf 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -83,10 +83,10 @@ private[spark] class MetricsSystem private ( def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array()) metricsConfig.initialize() - registerSources() - registerSinks() def start() { + registerSources() + registerSinks() sinks.foreach(_.start) } @@ -98,10 +98,39 @@ private[spark] class MetricsSystem private ( sinks.foreach(_.report()) } + /** + * Build a name that uniquely identifies each metric source. + * The name is structured as follows: <app ID>.<executor ID (or "driver")>.<source name>. + * If either ID is not available, this defaults to just using <source name>. + * + * @param source Metric source to be named by this method. + * @return An unique metric name for each combination of + * application, executor/driver and metric source. + */ + def buildRegistryName(source: Source): String = { + val appId = conf.getOption("spark.app.id") + val executorId = conf.getOption("spark.executor.id") + val defaultName = MetricRegistry.name(source.sourceName) + + if (instance == "driver" || instance == "executor") { + if (appId.isDefined && executorId.isDefined) { + MetricRegistry.name(appId.get, executorId.get, source.sourceName) + } else { + // Only Driver and Executor are set spark.app.id and spark.executor.id. + // For instance, Master and Worker are not related to a specific application. + val warningMsg = s"Using default name $defaultName for source because %s is not set." + if (appId.isEmpty) { logWarning(warningMsg.format("spark.app.id")) } + if (executorId.isEmpty) { logWarning(warningMsg.format("spark.executor.id")) } + defaultName + } + } else { defaultName } + } + def registerSource(source: Source) { sources += source try { - registry.register(source.sourceName, source.metricRegistry) + val regName = buildRegistryName(source) + registry.register(regName, source.metricRegistry) } catch { case e: IllegalArgumentException => logInfo("Metrics already registered", e) } @@ -109,8 +138,9 @@ private[spark] class MetricsSystem private ( def removeSource(source: Source) { sources -= source + val regName = buildRegistryName(source) registry.removeMatching(new MetricFilter { - def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName) + def matches(name: String, metric: Metric): Boolean = name.startsWith(regName) }) } @@ -125,7 +155,7 @@ private[spark] class MetricsSystem private ( val source = Class.forName(classPath).newInstance() registerSource(source.asInstanceOf[Source]) } catch { - case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e) + case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 94944399b1..12668b6c09 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { override val metricRegistry = new MetricRegistry() - override val sourceName = "%s.DAGScheduler".format(sc.appName) + override val sourceName = "DAGScheduler" metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] { override def getValue: Int = dagScheduler.failedStages.size diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 64b32ae0ed..100c9ba9b7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -43,25 +43,23 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils} * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams */ private[spark] class EventLoggingListener( - appName: String, + appId: String, + logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) extends SparkListener with Logging { import EventLoggingListener._ - def this(appName: String, sparkConf: SparkConf) = - this(appName, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) + def this(appId: String, logBaseDir: String, sparkConf: SparkConf) = + this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) private val testing = sparkConf.getBoolean("spark.eventLog.testing", false) private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024 - private val logBaseDir = sparkConf.get("spark.eventLog.dir", DEFAULT_LOG_DIR).stripSuffix("/") - private val name = appName.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_") - .toLowerCase + "-" + System.currentTimeMillis - val logDir = Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") - + val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId) + val logDirName: String = logDir.split("/").last protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS)) @@ -69,13 +67,6 @@ private[spark] class EventLoggingListener( private[scheduler] val loggedEvents = new ArrayBuffer[JValue] /** - * Return only the unique application directory without the base directory. - */ - def getApplicationLogDir(): String = { - name - } - - /** * Begin logging events. * If compression is used, log a file that indicates which compression library is used. */ @@ -185,6 +176,18 @@ private[spark] object EventLoggingListener extends Logging { } /** + * Return a file-system-safe path to the log directory for the given application. + * + * @param logBaseDir A base directory for the path to the log directory for given application. + * @param appId A unique app ID. + * @return A path which consists of file-system-safe characters. + */ + def getLogDirPath(logBaseDir: String, appId: String): String = { + val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase + Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + } + + /** * Parse the event logging information associated with the logs in the given directory. * * Specifically, this looks for event log files, the Spark version file, the compression diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index a0be8307ef..992c477493 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -23,6 +23,8 @@ package org.apache.spark.scheduler * machines become available and can launch tasks on them. */ private[spark] trait SchedulerBackend { + private val appId = "spark-application-" + System.currentTimeMillis + def start(): Unit def stop(): Unit def reviveOffers(): Unit @@ -33,10 +35,10 @@ private[spark] trait SchedulerBackend { def isReady(): Boolean = true /** - * The application ID associated with the job, if any. + * Get an application ID associated with the job. * - * @return The application ID, or None if the backend does not provide an ID. + * @return An application ID */ - def applicationId(): Option[String] = None + def applicationId(): String = appId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 1c1ce666ea..a129a434c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -31,6 +31,8 @@ import org.apache.spark.storage.BlockManagerId */ private[spark] trait TaskScheduler { + private val appId = "spark-application-" + System.currentTimeMillis + def rootPool: Pool def schedulingMode: SchedulingMode @@ -66,10 +68,10 @@ private[spark] trait TaskScheduler { blockManagerId: BlockManagerId): Boolean /** - * The application ID associated with the job, if any. + * Get an application ID associated with the job. * - * @return The application ID, or None if the backend does not provide an ID. + * @return An application ID */ - def applicationId(): Option[String] = None + def applicationId(): String = appId } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 633e892554..4dc550413c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -492,7 +492,7 @@ private[spark] class TaskSchedulerImpl( } } - override def applicationId(): Option[String] = backend.applicationId() + override def applicationId(): String = backend.applicationId() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 5c5ecc8434..ed209d195e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -68,9 +68,8 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - val eventLogDir = sc.eventLogger.map(_.logDir) val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, eventLogDir) + appUIAddress, sc.eventLogDir) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() @@ -129,7 +128,11 @@ private[spark] class SparkDeploySchedulerBackend( totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio } - override def applicationId(): Option[String] = Option(appId) + override def applicationId(): String = + Option(appId).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } private def waitForRegistration() = { registrationLock.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 3161f1ee9f..90828578cd 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -76,6 +76,8 @@ private[spark] class CoarseMesosSchedulerBackend( var nextMesosTaskId = 0 + @volatile var appId: String = _ + def newMesosTaskId(): Int = { val id = nextMesosTaskId nextMesosTaskId += 1 @@ -167,7 +169,8 @@ private[spark] class CoarseMesosSchedulerBackend( override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { - logInfo("Registered as framework ID " + frameworkId.getValue) + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -313,4 +316,10 @@ private[spark] class CoarseMesosSchedulerBackend( slaveLost(d, s) } + override def applicationId(): String = + Option(appId).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 4c49aa074e..b11786368e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -30,7 +30,7 @@ import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} -import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler._ import org.apache.spark.util.Utils /** @@ -62,6 +62,8 @@ private[spark] class MesosSchedulerBackend( var classLoader: ClassLoader = null + @volatile var appId: String = _ + override def start() { synchronized { classLoader = Thread.currentThread.getContextClassLoader @@ -177,7 +179,8 @@ private[spark] class MesosSchedulerBackend( override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { val oldClassLoader = setClassLoader() try { - logInfo("Registered as framework ID " + frameworkId.getValue) + appId = frameworkId.getValue + logInfo("Registered as framework ID " + appId) registeredLock.synchronized { isRegistered = true registeredLock.notifyAll() @@ -372,4 +375,10 @@ private[spark] class MesosSchedulerBackend( // TODO: query Mesos for number of cores override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) + override def applicationId(): String = + Option(appId).getOrElse { + logWarning("Application ID is not initialized yet.") + super.applicationId + } + } diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 9ea25c2bc7..58b78f041c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -88,6 +88,7 @@ private[spark] class LocalActor( private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) extends SchedulerBackend with ExecutorBackend { + private val appId = "local-" + System.currentTimeMillis var localActor: ActorRef = null override def start() { @@ -115,4 +116,6 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: localActor ! StatusUpdate(taskId, state, serializedData) } + override def applicationId(): String = appId + } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 49fea6d9e2..8569c6f3cb 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -22,10 +22,10 @@ import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source -private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) +private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { override val metricRegistry = new MetricRegistry() - override val sourceName = "%s.BlockManager".format(sc.appName) + override val sourceName = "BlockManager" metricRegistry.register(MetricRegistry.name("memory", "maxMem_MB"), new Gauge[Long] { override def getValue: Long = { |