aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xbin/spark-class8
-rwxr-xr-xbin/spark-class2.cmd7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala287
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala76
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala82
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala62
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala43
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala146
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala65
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala15
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/FileSegment.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala21
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala31
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala24
-rw-r--r--docs/monitoring.md70
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala4
-rwxr-xr-xsbin/start-history-server.sh37
-rwxr-xr-xsbin/stop-history-server.sh25
38 files changed, 1075 insertions, 201 deletions
diff --git a/bin/spark-class b/bin/spark-class
index 76fde3e448..1b0d309cc5 100755
--- a/bin/spark-class
+++ b/bin/spark-class
@@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"
-# Add java opts and memory settings for master, worker, executors, and repl.
+# Add java opts and memory settings for master, worker, history server, executors, and repl.
case "$1" in
- # Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
+ # Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
@@ -58,6 +58,10 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;
+ 'org.apache.spark.deploy.history.HistoryServer')
+ OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
+ OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
+ ;;
# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend')
diff --git a/bin/spark-class2.cmd b/bin/spark-class2.cmd
index f488cfdbec..4302c1b6b7 100755
--- a/bin/spark-class2.cmd
+++ b/bin/spark-class2.cmd
@@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m
set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true
-rem Add java opts and memory settings for master, worker, executors, and repl.
-rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
+rem Add java opts and memory settings for master, worker, history server, executors, and repl.
+rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
+) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
+ set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
+ if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 76305237b0..e6c9b7000d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -219,15 +219,12 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf)
+ logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
}
- // Information needed to replay logged events, if any
- private[spark] val eventLoggingInfo: Option[EventLoggingInfo] =
- eventLogger.map { logger => Some(logger.info) }.getOrElse(None)
-
// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()
@@ -292,6 +289,7 @@ class SparkContext(config: SparkConf) extends Logging {
cleaner.foreach(_.start())
postEnvironmentUpdate()
+ postApplicationStart()
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
@@ -777,6 +775,9 @@ class SparkContext(config: SparkConf) extends Logging {
listenerBus.addListener(listener)
}
+ /** The version of Spark on which this application is running. */
+ def version = SparkContext.SPARK_VERSION
+
/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
@@ -930,6 +931,7 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
+ postApplicationEnd()
ui.stop()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
@@ -1175,6 +1177,20 @@ class SparkContext(config: SparkConf) extends Logging {
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
+ /** Post the application start event */
+ private def postApplicationStart() {
+ listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
+ }
+
+ /**
+ * Post the application end event to all listeners immediately, rather than adding it
+ * to the event queue for it to be asynchronously processed eventually. Otherwise, a race
+ * condition exists in which the listeners may stop before this event has been propagated.
+ */
+ private def postApplicationEnd() {
+ listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis))
+ }
+
/** Post the environment update event once the task scheduler is ready */
private def postEnvironmentUpdate() {
if (taskScheduler != null) {
@@ -1200,6 +1216,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {
+ private[spark] val SPARK_VERSION = "1.0.0"
+
private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 15fa8a7679..86305d2ea8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -17,8 +17,6 @@
package org.apache.spark.deploy
-import org.apache.spark.scheduler.EventLoggingInfo
-
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
@@ -26,7 +24,7 @@ private[spark] class ApplicationDescription(
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
- val eventLogInfo: Option[EventLoggingInfo] = None)
+ val eventLogDir: Option[String] = None)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
new file mode 100644
index 0000000000..33fceae4ff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+import org.apache.spark.ui.{SparkUI, WebUI}
+
+private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) {
+
+ /** Attach a SparkUI to this container. Only valid after bind(). */
+ def attachUI(ui: SparkUI) {
+ assert(serverInfo.isDefined,
+ "%s must be bound to a server before attaching SparkUIs".format(name))
+ val rootHandler = serverInfo.get.rootHandler
+ for (handler <- ui.handlers) {
+ rootHandler.addHandler(handler)
+ if (!handler.isStarted) {
+ handler.start()
+ }
+ }
+ }
+
+ /** Detach a SparkUI from this container. Only valid after bind(). */
+ def detachUI(ui: SparkUI) {
+ assert(serverInfo.isDefined,
+ "%s must be bound to a server before detaching SparkUIs".format(name))
+ val rootHandler = serverInfo.get.rootHandler
+ for (handler <- ui.handlers) {
+ if (handler.isStarted) {
+ handler.stop()
+ }
+ rootHandler.removeHandler(handler)
+ }
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
new file mode 100644
index 0000000000..97d2ba9dee
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.eclipse.jetty.servlet.ServletContextHandler
+
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
+import org.apache.spark.deploy.SparkUIContainer
+import org.apache.spark.scheduler._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
+
+/**
+ * A web server that renders SparkUIs of completed applications.
+ *
+ * For the standalone mode, MasterWebUI already achieves this functionality. Thus, the
+ * main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos).
+ *
+ * The logging directory structure is as follows: Within the given base directory, each
+ * application's event logs are maintained in the application's own sub-directory. This
+ * is the same structure as maintained in the event log write code path in
+ * EventLoggingListener.
+ *
+ * @param baseLogDir The base directory in which event logs are found
+ */
+class HistoryServer(
+ val baseLogDir: String,
+ conf: SparkConf)
+ extends SparkUIContainer("History Server") with Logging {
+
+ import HistoryServer._
+
+ private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
+ private val localHost = Utils.localHostName()
+ private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ private val port = WEB_UI_PORT
+ private val securityManager = new SecurityManager(conf)
+ private val indexPage = new IndexPage(this)
+
+ // A timestamp of when the disk was last accessed to check for log updates
+ private var lastLogCheckTime = -1L
+
+ // Number of completed applications found in this directory
+ private var numCompletedApplications = 0
+
+ @volatile private var stopped = false
+
+ /**
+ * A background thread that periodically checks for event log updates on disk.
+ *
+ * If a log check is invoked manually in the middle of a period, this thread re-adjusts the
+ * time at which it performs the next log check to maintain the same period as before.
+ *
+ * TODO: Add a mechanism to update manually.
+ */
+ private val logCheckingThread = new Thread {
+ override def run() {
+ while (!stopped) {
+ val now = System.currentTimeMillis
+ if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
+ checkForLogs()
+ Thread.sleep(UPDATE_INTERVAL_MS)
+ } else {
+ // If the user has manually checked for logs recently, wait until
+ // UPDATE_INTERVAL_MS after the last check time
+ Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
+ }
+ }
+ }
+ }
+
+ private val handlers = Seq[ServletContextHandler](
+ createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
+ createServletHandler("/",
+ (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
+ )
+
+ // A mapping of application ID to its history information, which includes the rendered UI
+ val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
+
+ /**
+ * Start the history server.
+ *
+ * This starts a background thread that periodically synchronizes information displayed on
+ * this UI with the event logs in the provided base directory.
+ */
+ def start() {
+ logCheckingThread.start()
+ }
+
+ /** Bind to the HTTP server behind this web interface. */
+ override def bind() {
+ try {
+ serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
+ logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to bind HistoryServer", e)
+ System.exit(1)
+ }
+ }
+
+ /**
+ * Check for any updates to event logs in the base directory. This is only effective once
+ * the server has been bound.
+ *
+ * If a new completed application is found, the server renders the associated SparkUI
+ * from the application's event logs, attaches this UI to itself, and stores metadata
+ * information for this application.
+ *
+ * If the logs for an existing completed application are no longer found, the server
+ * removes all associated information and detaches the SparkUI.
+ */
+ def checkForLogs() = synchronized {
+ if (serverInfo.isDefined) {
+ lastLogCheckTime = System.currentTimeMillis
+ logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
+ try {
+ val logStatus = fileSystem.listStatus(new Path(baseLogDir))
+ val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
+ val logInfos = logDirs
+ .sortBy { dir => getModificationTime(dir) }
+ .map { dir => (dir, EventLoggingListener.parseLoggingInfo(dir.getPath, fileSystem)) }
+ .filter { case (dir, info) => info.applicationComplete }
+
+ // Logging information for applications that should be retained
+ val retainedLogInfos = logInfos.takeRight(RETAINED_APPLICATIONS)
+ val retainedAppIds = retainedLogInfos.map { case (dir, _) => dir.getPath.getName }
+
+ // Remove any applications that should no longer be retained
+ appIdToInfo.foreach { case (appId, info) =>
+ if (!retainedAppIds.contains(appId)) {
+ detachUI(info.ui)
+ appIdToInfo.remove(appId)
+ }
+ }
+
+ // Render the application's UI if it is not already there
+ retainedLogInfos.foreach { case (dir, info) =>
+ val appId = dir.getPath.getName
+ if (!appIdToInfo.contains(appId)) {
+ renderSparkUI(dir, info)
+ }
+ }
+
+ // Track the total number of completed applications observed this round
+ numCompletedApplications = logInfos.size
+
+ } catch {
+ case t: Throwable => logError("Exception in checking for event log updates", t)
+ }
+ } else {
+ logWarning("Attempted to check for event log updates before binding the server.")
+ }
+ }
+
+ /**
+ * Render a new SparkUI from the event logs if the associated application is completed.
+ *
+ * HistoryServer looks for a special file that indicates application completion in the given
+ * directory. If this file exists, the associated application is regarded to be completed, in
+ * which case the server proceeds to render the SparkUI. Otherwise, the server does nothing.
+ */
+ private def renderSparkUI(logDir: FileStatus, logInfo: EventLoggingInfo) {
+ val path = logDir.getPath
+ val appId = path.getName
+ val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
+ val ui = new SparkUI(replayBus, appId, "/history/" + appId)
+ val appListener = new ApplicationEventListener
+ replayBus.addListener(appListener)
+
+ // Do not call ui.bind() to avoid creating a new server for each application
+ ui.start()
+ replayBus.replay()
+ if (appListener.applicationStarted) {
+ attachUI(ui)
+ val appName = appListener.appName
+ val sparkUser = appListener.sparkUser
+ val startTime = appListener.startTime
+ val endTime = appListener.endTime
+ val lastUpdated = getModificationTime(logDir)
+ ui.setAppName(appName + " (completed)")
+ appIdToInfo(appId) = ApplicationHistoryInfo(appId, appName, startTime, endTime,
+ lastUpdated, sparkUser, path, ui)
+ }
+ }
+
+ /** Stop the server and close the file system. */
+ override def stop() {
+ super.stop()
+ stopped = true
+ fileSystem.close()
+ }
+
+ /** Return the address of this server. */
+ def getAddress: String = "http://" + publicHost + ":" + boundPort
+
+ /** Return the number of completed applications found, whether or not the UI is rendered. */
+ def getNumApplications: Int = numCompletedApplications
+
+ /** Return when this directory was last modified. */
+ private def getModificationTime(dir: FileStatus): Long = {
+ try {
+ val logFiles = fileSystem.listStatus(dir.getPath)
+ if (logFiles != null && !logFiles.isEmpty) {
+ logFiles.map(_.getModificationTime).max
+ } else {
+ dir.getModificationTime
+ }
+ } catch {
+ case t: Throwable =>
+ logError("Exception in accessing modification time of %s".format(dir.getPath), t)
+ -1L
+ }
+ }
+}
+
+/**
+ * The recommended way of starting and stopping a HistoryServer is through the scripts
+ * start-history-server.sh and stop-history-server.sh. The path to a base log directory
+ * is must be specified, while the requested UI port is optional. For example:
+ *
+ * ./sbin/spark-history-server.sh /tmp/spark-events
+ * ./sbin/spark-history-server.sh hdfs://1.2.3.4:9000/spark-events
+ *
+ * This launches the HistoryServer as a Spark daemon.
+ */
+object HistoryServer {
+ private val conf = new SparkConf
+
+ // Interval between each check for event log updates
+ val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
+
+ // How many applications to retain
+ val RETAINED_APPLICATIONS = conf.getInt("spark.history.retainedApplications", 250)
+
+ // The port to which the web UI is bound
+ val WEB_UI_PORT = conf.getInt("spark.history.ui.port", 18080)
+
+ val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
+
+ def main(argStrings: Array[String]) {
+ val args = new HistoryServerArguments(argStrings)
+ val server = new HistoryServer(args.logDir, conf)
+ server.bind()
+ server.start()
+
+ // Wait until the end of the world... or if the HistoryServer process is manually stopped
+ while(true) { Thread.sleep(Int.MaxValue) }
+ server.stop()
+ }
+}
+
+
+private[spark] case class ApplicationHistoryInfo(
+ id: String,
+ name: String,
+ startTime: Long,
+ endTime: Long,
+ lastUpdated: Long,
+ sparkUser: String,
+ logDirPath: Path,
+ ui: SparkUI) {
+ def started = startTime != -1
+ def completed = endTime != -1
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
new file mode 100644
index 0000000000..943c061743
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.net.URI
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.util.Utils
+
+/**
+ * Command-line parser for the master.
+ */
+private[spark] class HistoryServerArguments(args: Array[String]) {
+ var logDir = ""
+
+ parse(args.toList)
+
+ private def parse(args: List[String]): Unit = {
+ args match {
+ case ("--dir" | "-d") :: value :: tail =>
+ logDir = value
+ parse(tail)
+
+ case ("--help" | "-h") :: tail =>
+ printUsageAndExit(0)
+
+ case Nil =>
+
+ case _ =>
+ printUsageAndExit(1)
+ }
+ validateLogDir()
+ }
+
+ private def validateLogDir() {
+ if (logDir == "") {
+ System.err.println("Logging directory must be specified.")
+ printUsageAndExit(1)
+ }
+ val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
+ val path = new Path(logDir)
+ if (!fileSystem.exists(path)) {
+ System.err.println("Logging directory specified does not exist: %s".format(logDir))
+ printUsageAndExit(1)
+ }
+ if (!fileSystem.getFileStatus(path).isDir) {
+ System.err.println("Logging directory specified is not a directory: %s".format(logDir))
+ printUsageAndExit(1)
+ }
+ }
+
+ private def printUsageAndExit(exitCode: Int) {
+ System.err.println(
+ "Usage: HistoryServer [options]\n" +
+ "\n" +
+ "Options:\n" +
+ " -d DIR, --dir DIR Location of event log files")
+ System.exit(exitCode)
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
new file mode 100644
index 0000000000..54dffffec7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUI}
+
+private[spark] class IndexPage(parent: HistoryServer) {
+
+ def render(request: HttpServletRequest): Seq[Node] = {
+ val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
+ val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
+ val content =
+ <div class="row-fluid">
+ <div class="span12">
+ <ul class="unstyled">
+ <li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
+ </ul>
+ {
+ if (parent.appIdToInfo.size > 0) {
+ <h4>
+ Showing {parent.appIdToInfo.size}/{parent.getNumApplications}
+ Completed Application{if (parent.getNumApplications > 1) "s" else ""}
+ </h4> ++
+ appTable
+ } else {
+ <h4>No Completed Applications Found</h4>
+ }
+ }
+ </div>
+ </div>
+ UIUtils.basicSparkPage(content, "History Server")
+ }
+
+ private val appHeader = Seq(
+ "App Name",
+ "Started",
+ "Completed",
+ "Duration",
+ "Spark User",
+ "Log Directory",
+ "Last Updated")
+
+ private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+ val appName = if (info.started) info.name else info.logDirPath.getName
+ val uiAddress = parent.getAddress + info.ui.basePath
+ val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
+ val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed"
+ val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L
+ val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
+ val sparkUser = if (info.started) info.sparkUser else "Unknown user"
+ val logDirectory = info.logDirPath.getName
+ val lastUpdated = WebUI.formatDate(info.lastUpdated)
+ <tr>
+ <td><a href={uiAddress}>{appName}</a></td>
+ <td>{startTime}</td>
+ <td>{endTime}</td>
+ <td>{duration}</td>
+ <td>{sparkUser}</td>
+ <td>{logDirectory}</td>
+ <td>{lastUpdated}</td>
+ </tr>
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 95bd62e88d..2446e86cb6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,6 +29,7 @@ import akka.actor._
import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
+import org.apache.hadoop.fs.FileSystem
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState}
@@ -37,7 +38,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.scheduler.ReplayListenerBus
+import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -45,7 +46,8 @@ private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
- val securityMgr: SecurityManager) extends Actor with Logging {
+ val securityMgr: SecurityManager)
+ extends Actor with Logging {
import context.dispatcher // to use Akka's scheduler.schedule()
@@ -71,6 +73,7 @@ private[spark] class Master(
var nextAppNumber = 0
val appIdToUI = new HashMap[String, SparkUI]
+ val fileSystemsUsed = new HashSet[FileSystem]
val drivers = new HashSet[DriverInfo]
val completedDrivers = new ArrayBuffer[DriverInfo]
@@ -149,6 +152,7 @@ private[spark] class Master(
override def postStop() {
webUi.stop()
+ fileSystemsUsed.foreach(_.close())
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
persistenceEngine.close()
@@ -630,11 +634,7 @@ private[spark] class Master(
waitingApps -= app
// If application events are logged, use them to rebuild the UI
- startPersistedSparkUI(app).map { ui =>
- app.desc.appUiUrl = ui.basePath
- appIdToUI(app.id) = ui
- webUi.attachUI(ui)
- }.getOrElse {
+ if (!rebuildSparkUI(app)) {
// Avoid broken links if the UI is not reconstructed
app.desc.appUiUrl = ""
}
@@ -654,30 +654,34 @@ private[spark] class Master(
}
/**
- * Start a new SparkUI rendered from persisted storage. If this is unsuccessful for any reason,
- * return None. Otherwise return the reconstructed UI.
+ * Rebuild a new SparkUI from the given application's event logs.
+ * Return whether this is successful.
*/
- def startPersistedSparkUI(app: ApplicationInfo): Option[SparkUI] = {
+ def rebuildSparkUI(app: ApplicationInfo): Boolean = {
val appName = app.desc.name
- val eventLogInfo = app.desc.eventLogInfo.getOrElse { return None }
- val eventLogDir = eventLogInfo.logDir
- val eventCompressionCodec = eventLogInfo.compressionCodec
- val appConf = new SparkConf
- eventCompressionCodec.foreach { codec =>
- appConf.set("spark.eventLog.compress", "true")
- appConf.set("spark.io.compression.codec", codec)
- }
- val replayerBus = new ReplayListenerBus(appConf)
- val ui = new SparkUI(
- appConf,
- replayerBus,
- "%s (finished)".format(appName),
- "/history/%s".format(app.id))
-
- // Do not call ui.bind() to avoid creating a new server for each application
- ui.start()
- val success = replayerBus.replay(eventLogDir)
- if (success) Some(ui) else None
+ val eventLogDir = app.desc.eventLogDir.getOrElse { return false }
+ val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
+ val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
+ val eventLogPaths = eventLogInfo.logPaths
+ val compressionCodec = eventLogInfo.compressionCodec
+ if (!eventLogPaths.isEmpty) {
+ try {
+ val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
+ val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id)
+ ui.start()
+ replayBus.replay()
+ app.desc.appUiUrl = ui.basePath
+ appIdToUI(app.id) = ui
+ webUi.attachUI(ui)
+ return true
+ } catch {
+ case t: Throwable =>
+ logError("Exception in replaying log for application %s (%s)".format(appName, app.id), t)
+ }
+ } else {
+ logWarning("Application %s (%s) has no valid logs: %s".format(appName, app.id, eventLogDir))
+ }
+ false
}
/** Generate a new app ID given a app's submission date */
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 01d9f52f4b..30c8ade408 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -22,8 +22,9 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.deploy.master.Master
-import org.apache.spark.ui.{ServerInfo, SparkUI}
+import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -31,7 +32,9 @@ import org.apache.spark.util.{AkkaUtils, Utils}
* Web UI server for the standalone master.
*/
private[spark]
-class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
+class MasterWebUI(val master: Master, requestedPort: Int)
+ extends SparkUIContainer("MasterWebUI") with Logging {
+
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
@@ -39,7 +42,6 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
private val port = requestedPort
private val applicationPage = new ApplicationPage(this)
private val indexPage = new IndexPage(this)
- private var serverInfo: Option[ServerInfo] = None
private val handlers: Seq[ServletContextHandler] = {
master.masterMetricsSystem.getServletHandlers ++
@@ -57,47 +59,18 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
)
}
- def bind() {
+ /** Bind to the HTTP server behind this web interface. */
+ override def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
- logError("Failed to create Master JettyUtils", e)
+ logError("Failed to create Master web UI", e)
System.exit(1)
}
}
- def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
- /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
- def attachUI(ui: SparkUI) {
- assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
- val rootHandler = serverInfo.get.rootHandler
- for (handler <- ui.handlers) {
- rootHandler.addHandler(handler)
- if (!handler.isStarted) {
- handler.start()
- }
- }
- }
-
- /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
- def detachUI(ui: SparkUI) {
- assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
- val rootHandler = serverInfo.get.rootHandler
- for (handler <- ui.handlers) {
- if (handler.isStarted) {
- handler.stop()
- }
- rootHandler.removeHandler(handler)
- }
- }
-
- def stop() {
- assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
- serverInfo.get.server.stop()
- }
}
private[spark] object MasterWebUI {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 650f3da5ce..5625a44549 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -24,7 +24,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler
import org.apache.spark.Logging
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils}
+import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}
@@ -33,15 +33,14 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
- extends Logging {
+ extends WebUI("WorkerWebUI") with Logging {
val timeout = AkkaUtils.askTimeout(worker.conf)
private val host = Utils.localHostName()
private val port = requestedPort.getOrElse(
- worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+ worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
private val indexPage = new IndexPage(this)
- private var serverInfo: Option[ServerInfo] = None
private val handlers: Seq[ServletContextHandler] = {
worker.metricsSystem.getServletHandlers ++
@@ -58,19 +57,18 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
)
}
- def bind() {
+ /** Bind to the HTTP server behind this web interface. */
+ override def bind() {
try {
- serverInfo = Some(JettyUtils.startJettyServer("0.0.0.0", port, handlers, worker.conf))
+ serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
- logError("Failed to create Worker JettyUtils", e)
+ logError("Failed to create Worker web UI", e)
System.exit(1)
}
}
- def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
private def log(request: HttpServletRequest): String = {
val defaultBytes = 100 * 1024
@@ -187,13 +185,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
(startByte, endByte)
}
- def stop() {
- assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!")
- serverInfo.get.server.stop()
- }
}
private[spark] object WorkerWebUI {
+ val DEFAULT_PORT=8081
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR
- val DEFAULT_PORT="8081"
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
new file mode 100644
index 0000000000..affda13df6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * A simple listener for application events.
+ *
+ * This listener expects to hear events from a single application only. If events
+ * from multiple applications are seen, the behavior is unspecified.
+ */
+private[spark] class ApplicationEventListener extends SparkListener {
+ var appName = "<Not Started>"
+ var sparkUser = "<Not Started>"
+ var startTime = -1L
+ var endTime = -1L
+
+ def applicationStarted = startTime != -1
+
+ def applicationFinished = endTime != -1
+
+ def applicationDuration: Long = {
+ val difference = endTime - startTime
+ if (applicationStarted && applicationFinished && difference > 0) difference else -1L
+ }
+
+ override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
+ appName = applicationStart.appName
+ startTime = applicationStart.time
+ sparkUser = applicationStart.sparkUser
+ }
+
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+ endTime = applicationEnd.time
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 217f8825c2..b983c16af1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -17,11 +17,14 @@
package org.apache.spark.scheduler
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods._
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{JsonProtocol, FileLogger}
+import org.apache.spark.util.{FileLogger, JsonProtocol}
/**
* A SparkListener that logs events to persistent storage.
@@ -36,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, FileLogger}
private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
extends SparkListener with Logging {
+ import EventLoggingListener._
+
private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
@@ -46,17 +51,21 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
private val logger =
new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
- // Information needed to replay the events logged by this listener later
- val info = {
- val compressionCodec = if (shouldCompress) {
- Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC))
- } else None
- EventLoggingInfo(logDir, compressionCodec)
+ /**
+ * Begin logging events.
+ * If compression is used, log a file that indicates which compression library is used.
+ */
+ def start() {
+ logInfo("Logging events to %s".format(logDir))
+ if (shouldCompress) {
+ val codec = conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
+ logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
+ }
+ logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
+ logger.newFile(LOG_PREFIX + logger.fileIndex)
}
- logInfo("Logging events to %s".format(logDir))
-
- /** Log the event as JSON */
+ /** Log the event as JSON. */
private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
logger.logLine(eventJson)
@@ -90,9 +99,118 @@ private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
logEvent(event, flushLogger = true)
override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
logEvent(event, flushLogger = true)
+ override def onApplicationStart(event: SparkListenerApplicationStart) =
+ logEvent(event, flushLogger = true)
+ override def onApplicationEnd(event: SparkListenerApplicationEnd) =
+ logEvent(event, flushLogger = true)
+
+ /**
+ * Stop logging events.
+ * In addition, create an empty special file to indicate application completion.
+ */
+ def stop() = {
+ logger.newFile(APPLICATION_COMPLETE)
+ logger.stop()
+ }
+}
+
+private[spark] object EventLoggingListener extends Logging {
+ val LOG_PREFIX = "EVENT_LOG_"
+ val SPARK_VERSION_PREFIX = "SPARK_VERSION_"
+ val COMPRESSION_CODEC_PREFIX = "COMPRESSION_CODEC_"
+ val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
+
+ // A cache for compression codecs to avoid creating the same codec many times
+ private val codecMap = new mutable.HashMap[String, CompressionCodec]
+
+ def isEventLogFile(fileName: String): Boolean = {
+ fileName.startsWith(LOG_PREFIX)
+ }
+
+ def isSparkVersionFile(fileName: String): Boolean = {
+ fileName.startsWith(SPARK_VERSION_PREFIX)
+ }
+
+ def isCompressionCodecFile(fileName: String): Boolean = {
+ fileName.startsWith(COMPRESSION_CODEC_PREFIX)
+ }
+
+ def isApplicationCompleteFile(fileName: String): Boolean = {
+ fileName == APPLICATION_COMPLETE
+ }
+
+ def parseSparkVersion(fileName: String): String = {
+ if (isSparkVersionFile(fileName)) {
+ fileName.replaceAll(SPARK_VERSION_PREFIX, "")
+ } else ""
+ }
+
+ def parseCompressionCodec(fileName: String): String = {
+ if (isCompressionCodecFile(fileName)) {
+ fileName.replaceAll(COMPRESSION_CODEC_PREFIX, "")
+ } else ""
+ }
+
+ /**
+ * Parse the event logging information associated with the logs in the given directory.
+ *
+ * Specifically, this looks for event log files, the Spark version file, the compression
+ * codec file (if event logs are compressed), and the application completion file (if the
+ * application has run to completion).
+ */
+ def parseLoggingInfo(logDir: Path, fileSystem: FileSystem): EventLoggingInfo = {
+ try {
+ val fileStatuses = fileSystem.listStatus(logDir)
+ val filePaths =
+ if (fileStatuses != null) {
+ fileStatuses.filter(!_.isDir).map(_.getPath).toSeq
+ } else {
+ Seq[Path]()
+ }
+ if (filePaths.isEmpty) {
+ logWarning("No files found in logging directory %s".format(logDir))
+ }
+ EventLoggingInfo(
+ logPaths = filePaths.filter { path => isEventLogFile(path.getName) },
+ sparkVersion = filePaths
+ .find { path => isSparkVersionFile(path.getName) }
+ .map { path => parseSparkVersion(path.getName) }
+ .getOrElse("<Unknown>"),
+ compressionCodec = filePaths
+ .find { path => isCompressionCodecFile(path.getName) }
+ .map { path =>
+ val codec = EventLoggingListener.parseCompressionCodec(path.getName)
+ val conf = new SparkConf
+ conf.set("spark.io.compression.codec", codec)
+ codecMap.getOrElseUpdate(codec, CompressionCodec.createCodec(conf))
+ },
+ applicationComplete = filePaths.exists { path => isApplicationCompleteFile(path.getName) }
+ )
+ } catch {
+ case t: Throwable =>
+ logError("Exception in parsing logging info from directory %s".format(logDir), t)
+ EventLoggingInfo.empty
+ }
+ }
- def stop() = logger.stop()
+ /**
+ * Parse the event logging information associated with the logs in the given directory.
+ */
+ def parseLoggingInfo(logDir: String, fileSystem: FileSystem): EventLoggingInfo = {
+ parseLoggingInfo(new Path(logDir), fileSystem)
+ }
}
-// If compression is not enabled, compressionCodec is None
-private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String])
+
+/**
+ * Information needed to process the event logs associated with an application.
+ */
+private[spark] case class EventLoggingInfo(
+ logPaths: Seq[Path],
+ sparkVersion: String,
+ compressionCodec: Option[CompressionCodec],
+ applicationComplete: Boolean = false)
+
+private[spark] object EventLoggingInfo {
+ def empty = EventLoggingInfo(Seq[Path](), "<Unknown>", None, applicationComplete = false)
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index db76178b65..b03665fd56 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -18,7 +18,6 @@
package org.apache.spark.scheduler
import java.io.InputStream
-import java.net.URI
import scala.io.Source
@@ -26,63 +25,47 @@ import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.hadoop.fs.{Path, FileSystem}
import org.json4s.jackson.JsonMethods._
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.Logging
import org.apache.spark.io.CompressionCodec
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.JsonProtocol
/**
- * An EventBus that replays logged events from persisted storage
+ * A SparkListenerBus that replays logged events from persisted storage.
+ *
+ * This class expects files to be appropriately prefixed as specified in EventLoggingListener.
+ * There exists a one-to-one mapping between ReplayListenerBus and event logging applications.
*/
-private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus with Logging {
- private val compressed = conf.getBoolean("spark.eventLog.compress", false)
+private[spark] class ReplayListenerBus(
+ logPaths: Seq[Path],
+ fileSystem: FileSystem,
+ compressionCodec: Option[CompressionCodec])
+ extends SparkListenerBus with Logging {
- // Only used if compression is enabled
- private lazy val compressionCodec = CompressionCodec.createCodec(conf)
+ private var replayed = false
- /**
- * Return a list of paths representing log files in the given directory.
- */
- private def getLogFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = {
- val path = new Path(logDir)
- if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
- logWarning("Log path provided is not a valid directory: %s".format(logDir))
- return Array[Path]()
- }
- val logStatus = fileSystem.listStatus(path)
- if (logStatus == null || !logStatus.exists(!_.isDir)) {
- logWarning("Log path provided contains no log files: %s".format(logDir))
- return Array[Path]()
- }
- logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName)
+ if (logPaths.length == 0) {
+ logWarning("Log path provided contains no log files.")
}
/**
* Replay each event in the order maintained in the given logs.
+ * This should only be called exactly once.
*/
- def replay(logDir: String): Boolean = {
- val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- val logPaths = getLogFilePaths(logDir, fileSystem)
- if (logPaths.length == 0) {
- return false
- }
-
+ def replay() {
+ assert(!replayed, "ReplayListenerBus cannot replay events more than once")
logPaths.foreach { path =>
// Keep track of input streams at all levels to close them later
// This is necessary because an exception can occur in between stream initializations
var fileStream: Option[InputStream] = None
var bufferedStream: Option[InputStream] = None
var compressStream: Option[InputStream] = None
- var currentLine = ""
+ var currentLine = "<not started>"
try {
- currentLine = "<not started>"
fileStream = Some(fileSystem.open(path))
bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
- compressStream =
- if (compressed) {
- Some(compressionCodec.compressedInputStream(bufferedStream.get))
- } else bufferedStream
+ compressStream = Some(wrapForCompression(bufferedStream.get))
- // Parse each line as an event and post it to all attached listeners
+ // Parse each line as an event and post the event to all attached listeners
val lines = Source.fromInputStream(compressStream.get).getLines()
lines.foreach { line =>
currentLine = line
@@ -98,7 +81,11 @@ private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus
compressStream.foreach(_.close())
}
}
- fileSystem.close()
- true
+ replayed = true
+ }
+
+ /** If a compression codec is specified, wrap the given stream in a compression stream. */
+ private def wrapForCompression(stream: InputStream): InputStream = {
+ compressionCodec.map(_.compressedInputStream(stream)).getOrElse(stream)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index ced20350d5..378cf1aaeb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -75,6 +75,11 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
+ extends SparkListenerEvent
+
+case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
+
/** An event used in the listener to shutdown the listener daemon thread. */
private[spark] case object SparkListenerShutdown extends SparkListenerEvent
@@ -141,6 +146,16 @@ trait SparkListener {
* Called when an RDD is manually unpersisted by the application
*/
def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
+
+ /**
+ * Called when the application starts
+ */
+ def onApplicationStart(applicationStart: SparkListenerApplicationStart) { }
+
+ /**
+ * Called when the application ends
+ */
+ def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 729e120497..d6df193d9b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -61,6 +61,10 @@ private[spark] trait SparkListenerBus {
sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
case unpersistRDD: SparkListenerUnpersistRDD =>
sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
+ case applicationStart: SparkListenerApplicationStart =>
+ sparkListeners.foreach(_.onApplicationStart(applicationStart))
+ case applicationEnd: SparkListenerApplicationEnd =>
+ sparkListeners.foreach(_.onApplicationEnd(applicationEnd))
case SparkListenerShutdown =>
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 25b7472a99..936e9db805 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -49,7 +49,7 @@ private[spark] class SparkDeploySchedulerBackend(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- sparkHome, sc.ui.appUIAddress, sc.eventLoggingInfo)
+ sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
diff --git a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
index 555486830a..132502b75f 100644
--- a/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FileSegment.scala
@@ -23,6 +23,6 @@ import java.io.File
* References a particular segment of a file (potentially the entire file),
* based off an offset and a length.
*/
-private[spark] class FileSegment(val file: File, val offset: Long, val length : Long) {
+private[spark] class FileSegment(val file: File, val offset: Long, val length: Long) {
override def toString = "(name=%s, offset=%d, length=%d)".format(file.getName, offset, length)
}
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index f53df7fbed..b8e6e15880 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -34,23 +34,22 @@ private[spark] class SparkUI(
val sc: SparkContext,
conf: SparkConf,
val listenerBus: SparkListenerBus,
- val appName: String,
+ var appName: String,
val basePath: String = "")
- extends Logging {
+ extends WebUI("SparkUI") with Logging {
def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName)
- def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
- this(null, conf, listenerBus, appName, basePath)
+ def this(listenerBus: SparkListenerBus, appName: String, basePath: String) =
+ this(null, new SparkConf, listenerBus, appName, basePath)
// If SparkContext is not provided, assume the associated application is not live
val live = sc != null
val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
- private val bindHost = Utils.localHostName()
- private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
- private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt
- private var serverInfo: Option[ServerInfo] = None
+ private val localHost = Utils.localHostName()
+ private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
+ private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
private val storage = new BlockManagerUI(this)
private val jobs = new JobProgressUI(this)
@@ -77,20 +76,10 @@ private[spark] class SparkUI(
// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener
- /** Bind the HTTP server which backs this web interface */
- def bind() {
- try {
- serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
- logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort))
- } catch {
- case e: Exception =>
- logError("Failed to create Spark JettyUtils", e)
- System.exit(1)
- }
+ def setAppName(name: String) {
+ appName = name
}
- def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
-
/** Initialize all components of the server */
def start() {
storage.start()
@@ -106,9 +95,21 @@ private[spark] class SparkUI(
listenerBus.addListener(exec.listener)
}
- def stop() {
- assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!")
- serverInfo.get.server.stop()
+ /** Bind to the HTTP server behind this web interface. */
+ override def bind() {
+ try {
+ serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
+ logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort))
+ } catch {
+ case e: Exception =>
+ logError("Failed to create Spark web UI", e)
+ System.exit(1)
+ }
+ }
+
+ /** Stop the server behind this web interface. Only valid after bind(). */
+ override def stop() {
+ super.stop()
logInfo("Stopped Spark Web UI at %s".format(appUIAddress))
}
@@ -117,6 +118,6 @@ private[spark] class SparkUI(
}
private[spark] object SparkUI {
- val DEFAULT_PORT = "4040"
+ val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
}
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index a7b872f344..2cc7582eca 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -20,6 +20,25 @@ package org.apache.spark.ui
import java.text.SimpleDateFormat
import java.util.Date
+private[spark] abstract class WebUI(name: String) {
+ protected var serverInfo: Option[ServerInfo] = None
+
+ /**
+ * Bind to the HTTP server behind this web interface.
+ * Overridden implementation should set serverInfo.
+ */
+ def bind() { }
+
+ /** Return the actual port to which this server is bound. Only valid after bind(). */
+ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
+
+ /** Stop the server behind this web interface. Only valid after bind(). */
+ def stop() {
+ assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name))
+ serverInfo.get.server.stop()
+ }
+}
+
/**
* Utilities used throughout the web UI.
*/
@@ -45,6 +64,6 @@ private[spark] object WebUI {
return "%.0f min".format(minutes)
}
val hours = minutes / 60
- return "%.1f h".format(hours)
+ "%.1f h".format(hours)
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
index 23e90c34d5..33df97187e 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala
@@ -29,10 +29,11 @@ import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.Page.Environment
private[ui] class EnvironmentUI(parent: SparkUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private var _listener: Option[EnvironmentListener] = None
+ private def appName = parent.appName
+
lazy val listener = _listener.get
def start() {
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index 031ed88a49..77a38a1d3a 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -33,10 +33,11 @@ import org.apache.spark.ui.{SparkUI, UIUtils}
import org.apache.spark.util.Utils
private[ui] class ExecutorsUI(parent: SparkUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private var _listener: Option[ExecutorsListener] = None
+ private def appName = parent.appName
+
lazy val listener = _listener.get
def start() {
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
index 70d62b66a4..f811aff616 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala
@@ -27,13 +27,14 @@ import org.apache.spark.ui.UIUtils
/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class IndexPage(parent: JobProgressUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler
+ private def appName = parent.appName
+
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index b2c67381cc..ad1a12cdc4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -29,7 +29,6 @@ import org.apache.spark.util.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobProgressUI(parent: SparkUI) {
- val appName = parent.appName
val basePath = parent.basePath
val live = parent.live
val sc = parent.sc
@@ -42,6 +41,8 @@ private[ui] class JobProgressUI(parent: SparkUI) {
private val poolPage = new PoolPage(this)
private var _listener: Option[JobProgressListener] = None
+ def appName = parent.appName
+
def start() {
val conf = if (live) sc.conf else new SparkConf
_listener = Some(new JobProgressListener(conf))
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index bd33182b70..3638e6035b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -27,12 +27,13 @@ import org.apache.spark.ui.UIUtils
/** Page showing specific pool details */
private[ui] class PoolPage(parent: JobProgressUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private val live = parent.live
private val sc = parent.sc
private lazy val listener = parent.listener
+ private def appName = parent.appName
+
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 0c55f2ee7e..0bcbd7461c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -28,10 +28,11 @@ import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private lazy val listener = parent.listener
+ private def appName = parent.appName
+
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
index a7b24ff695..16996a2da1 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala
@@ -30,7 +30,6 @@ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils}
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class BlockManagerUI(parent: SparkUI) {
- val appName = parent.appName
val basePath = parent.basePath
private val indexPage = new IndexPage(this)
@@ -39,6 +38,8 @@ private[ui] class BlockManagerUI(parent: SparkUI) {
lazy val listener = _listener.get
+ def appName = parent.appName
+
def start() {
_listener = Some(new BlockManagerListener(parent.storageStatusListener))
}
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
index 0fa461e5e9..4f6acc30a8 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
@@ -28,10 +28,11 @@ import org.apache.spark.util.Utils
/** Page showing list of RDD's currently stored in the cluster */
private[ui] class IndexPage(parent: BlockManagerUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private lazy val listener = parent.listener
+ private def appName = parent.appName
+
def render(request: HttpServletRequest): Seq[Node] = {
val rdds = listener.rddInfoList
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
index 3f42eba4ec..75ee9976d7 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala
@@ -28,10 +28,11 @@ import org.apache.spark.util.Utils
/** Page showing storage details for a given RDD */
private[ui] class RDDPage(parent: BlockManagerUI) {
- private val appName = parent.appName
private val basePath = parent.basePath
private lazy val listener = parent.listener
+ private def appName = parent.appName
+
def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index b5f2ec6831..0080a8b342 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -49,7 +49,7 @@ private[spark] class FileLogger(
}
private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
- private var fileIndex = 0
+ var fileIndex = 0
// Only used if compression is enabled
private lazy val compressionCodec = CompressionCodec.createCodec(conf)
@@ -57,10 +57,9 @@ private[spark] class FileLogger(
// Only defined if the file system scheme is not local
private var hadoopDataStream: Option[FSDataOutputStream] = None
- private var writer: Option[PrintWriter] = {
- createLogDir()
- Some(createWriter())
- }
+ private var writer: Option[PrintWriter] = None
+
+ createLogDir()
/**
* Create a logging directory with the given path.
@@ -84,8 +83,8 @@ private[spark] class FileLogger(
/**
* Create a new writer for the file identified by the given path.
*/
- private def createWriter(): PrintWriter = {
- val logPath = logDir + "/" + fileIndex
+ private def createWriter(fileName: String): PrintWriter = {
+ val logPath = logDir + "/" + fileName
val uri = new URI(logPath)
/* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -147,13 +146,17 @@ private[spark] class FileLogger(
}
/**
- * Start a writer for a new file if one does not already exit.
+ * Start a writer for a new file, closing the existing one if it exists.
+ * @param fileName Name of the new file, defaulting to the file index if not provided.
*/
- def start() {
- writer.getOrElse {
- fileIndex += 1
- writer = Some(createWriter())
+ def newFile(fileName: String = "") {
+ fileIndex += 1
+ writer.foreach(_.close())
+ val name = fileName match {
+ case "" => fileIndex.toString
+ case _ => fileName
}
+ writer = Some(createWriter(name))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 19654892bf..d990fd49ef 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -62,6 +62,10 @@ private[spark] object JsonProtocol {
blockManagerRemovedToJson(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
unpersistRDDToJson(unpersistRDD)
+ case applicationStart: SparkListenerApplicationStart =>
+ applicationStartToJson(applicationStart)
+ case applicationEnd: SparkListenerApplicationEnd =>
+ applicationEndToJson(applicationEnd)
// Not used, but keeps compiler happy
case SparkListenerShutdown => JNothing
@@ -157,6 +161,18 @@ private[spark] object JsonProtocol {
("RDD ID" -> unpersistRDD.rddId)
}
+ def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
+ ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
+ ("App Name" -> applicationStart.appName) ~
+ ("Timestamp" -> applicationStart.time) ~
+ ("User" -> applicationStart.sparkUser)
+ }
+
+ def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
+ ("Event" -> Utils.getFormattedClassName(applicationEnd)) ~
+ ("Timestamp" -> applicationEnd.time)
+ }
+
/** ------------------------------------------------------------------- *
* JSON serialization methods for classes SparkListenerEvents depend on |
@@ -346,6 +362,8 @@ private[spark] object JsonProtocol {
val blockManagerAdded = Utils.getFormattedClassName(SparkListenerBlockManagerAdded)
val blockManagerRemoved = Utils.getFormattedClassName(SparkListenerBlockManagerRemoved)
val unpersistRDD = Utils.getFormattedClassName(SparkListenerUnpersistRDD)
+ val applicationStart = Utils.getFormattedClassName(SparkListenerApplicationStart)
+ val applicationEnd = Utils.getFormattedClassName(SparkListenerApplicationEnd)
(json \ "Event").extract[String] match {
case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -359,6 +377,8 @@ private[spark] object JsonProtocol {
case `blockManagerAdded` => blockManagerAddedFromJson(json)
case `blockManagerRemoved` => blockManagerRemovedFromJson(json)
case `unpersistRDD` => unpersistRDDFromJson(json)
+ case `applicationStart` => applicationStartFromJson(json)
+ case `applicationEnd` => applicationEndFromJson(json)
}
}
@@ -430,6 +450,17 @@ private[spark] object JsonProtocol {
SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
}
+ def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
+ val appName = (json \ "App Name").extract[String]
+ val time = (json \ "Timestamp").extract[Long]
+ val sparkUser = (json \ "User").extract[String]
+ SparkListenerApplicationStart(appName, time, sparkUser)
+ }
+
+ def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
+ SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
+ }
+
/** --------------------------------------------------------------------- *
* JSON deserialization methods for classes SparkListenerEvents depend on |
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 59da51f3e0..166f48ce73 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.SortedSet
import scala.io.Source
import scala.reflect.ClassTag
@@ -1022,4 +1021,11 @@ private[spark] object Utils extends Logging {
def getHadoopFileSystem(path: URI): FileSystem = {
FileSystem.get(path, SparkHadoopUtil.get.newConfiguration())
}
+
+ /**
+ * Return a Hadoop FileSystem with the scheme encoded in the given path.
+ */
+ def getHadoopFileSystem(path: String): FileSystem = {
+ getHadoopFileSystem(new URI(path))
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index beac656f57..8c06a2d9aa 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, Success}
+import org.apache.spark.{LocalSparkContext, SparkConf, Success}
import org.apache.spark.executor.{ShuffleReadMetrics, TaskMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0342a8aff3..f75297a02d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util
-import java.util.{Properties, UUID}
+import java.util.Properties
import scala.collection.Map
@@ -52,6 +52,8 @@ class JsonProtocolSuite extends FunSuite {
val blockManagerRemoved = SparkListenerBlockManagerRemoved(
BlockManagerId("Scarce", "to be counted...", 100, 200))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
+ val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
+ val applicationEnd = SparkListenerApplicationEnd(42L)
testEvent(stageSubmitted, stageSubmittedJsonString)
testEvent(stageCompleted, stageCompletedJsonString)
@@ -64,6 +66,8 @@ class JsonProtocolSuite extends FunSuite {
testEvent(blockManagerAdded, blockManagerAddedJsonString)
testEvent(blockManagerRemoved, blockManagerRemovedJsonString)
testEvent(unpersistRdd, unpersistRDDJsonString)
+ testEvent(applicationStart, applicationStartJsonString)
+ testEvent(applicationEnd, applicationEndJsonString)
}
test("Dependent Classes") {
@@ -208,7 +212,13 @@ class JsonProtocolSuite extends FunSuite {
case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
- assert(e1.rddId === e2.rddId)
+ assert(e1.rddId == e2.rddId)
+ case (e1: SparkListenerApplicationStart, e2: SparkListenerApplicationStart) =>
+ assert(e1.appName == e2.appName)
+ assert(e1.time == e2.time)
+ assert(e1.sparkUser == e2.sparkUser)
+ case (e1: SparkListenerApplicationEnd, e2: SparkListenerApplicationEnd) =>
+ assert(e1.time == e2.time)
case (SparkListenerShutdown, SparkListenerShutdown) =>
case _ => fail("Events don't match in types!")
}
@@ -553,4 +563,14 @@ class JsonProtocolSuite extends FunSuite {
{"Event":"SparkListenerUnpersistRDD","RDD ID":12345}
"""
+ private val applicationStartJsonString =
+ """
+ {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42,
+ "User":"Garfield"}
+ """
+
+ private val applicationEndJsonString =
+ """
+ {"Event":"SparkListenerApplicationEnd","Timestamp":42}
+ """
}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 15bfb04178..4c91c3a592 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -12,17 +12,77 @@ displays useful information about the application. This includes:
* A list of scheduler stages and tasks
* A summary of RDD sizes and memory usage
-* Information about the running executors
* Environmental information.
+* Information about the running executors
You can access this interface by simply opening `http://<driver-node>:4040` in a web browser.
-If multiple SparkContexts are running on the same host, they will bind to succesive ports
+If multiple SparkContexts are running on the same host, they will bind to successive ports
beginning with 4040 (4041, 4042, etc).
-Spark's Standalone Mode cluster manager also has its own
-[web UI](spark-standalone.html#monitoring-and-logging).
+Note that this information is only available for the duration of the application by default.
+To view the web UI after the fact, set `spark.eventLog.enabled` to true before starting the
+application. This configures Spark to log Spark events that encode the information displayed
+in the UI to persisted storage.
-Note that in both of these UIs, the tables are sortable by clicking their headers,
+## Viewing After the Fact
+
+Spark's Standalone Mode cluster manager also has its own
+[web UI](spark-standalone.html#monitoring-and-logging). If an application has logged events over
+the course of its lifetime, then the Standalone master's web UI will automatically re-render the
+application's UI after the application has finished.
+
+If Spark is run on Mesos or YARN, it is still possible to reconstruct the UI of a finished
+application through Spark's history server, provided that the application's event logs exist.
+You can start a the history server by executing:
+
+ ./sbin/start-history-server.sh <base-logging-directory>
+
+The base logging directory must be supplied, and should contain sub-directories that each
+represents an application's event logs. This creates a web interface at
+`http://<server-url>:18080` by default. The history server depends on the following variables:
+
+<table class="table">
+ <tr><th style="width:21%">Environment Variable</th><th>Meaning</th></tr>
+ <tr>
+ <td><code>SPARK_DAEMON_MEMORY</code></td>
+ <td>Memory to allocate to the history server. (default: 512m).</td>
+ </tr>
+ <tr>
+ <td><code>SPARK_DAEMON_JAVA_OPTS</code></td>
+ <td>JVM options for the history server (default: none).</td>
+ </tr>
+</table>
+
+Further, the history server can be configured as follows:
+
+<table class="table">
+ <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+ <tr>
+ <td>spark.history.updateInterval</td>
+ <td>10</td>
+ <td>
+ The period, in seconds, at which information displayed by this history server is updated.
+ Each update checks for any changes made to the event logs in persisted storage.
+ </td>
+ </tr>
+ <tr>
+ <td>spark.history.retainedApplications</td>
+ <td>250</td>
+ <td>
+ The number of application UIs to retain. If this cap is exceeded, then the oldest
+ applications will be removed.
+ </td>
+ </tr>
+ <tr>
+ <td>spark.history.ui.port</td>
+ <td>18080</td>
+ <td>
+ The port to which the web interface of the history server binds.
+ </td>
+ </tr>
+</table>
+
+Note that in all of these UIs, the tables are sortable by clicking their headers,
making it easy to identify slow tasks, data skew, etc.
# Metrics
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
index 3ebf288130..910b31d209 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
@@ -116,14 +116,14 @@ trait SparkILoopInit {
}
}
- def initializeSpark() {
+ def initializeSpark() {
intp.beQuietDuring {
command("""
@transient val sc = org.apache.spark.repl.Main.interp.createSparkContext();
""")
command("import org.apache.spark.SparkContext._")
}
- echo("Spark context available as sc.")
+ echo("Spark context available as sc.")
}
// code to be executed only after the interpreter is initialized
diff --git a/sbin/start-history-server.sh b/sbin/start-history-server.sh
new file mode 100755
index 0000000000..4a90c68763
--- /dev/null
+++ b/sbin/start-history-server.sh
@@ -0,0 +1,37 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Starts the history server on the machine this script is executed on.
+#
+# Usage: start-history-server.sh <base-log-dir> [<web-ui-port>]
+# Example: ./start-history-server.sh --dir /tmp/spark-events --port 18080
+#
+
+sbin=`dirname "$0"`
+sbin=`cd "$sbin"; pwd`
+
+if [ $# -lt 1 ]; then
+ echo "Usage: ./start-history-server.sh <base-log-dir>"
+ echo "Example: ./start-history-server.sh /tmp/spark-events"
+ exit
+fi
+
+LOG_DIR=$1
+
+"$sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 --dir "$LOG_DIR"
diff --git a/sbin/stop-history-server.sh b/sbin/stop-history-server.sh
new file mode 100755
index 0000000000..c0034ad641
--- /dev/null
+++ b/sbin/stop-history-server.sh
@@ -0,0 +1,25 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Stops the history server on the machine this script is executed on.
+
+sbin=`dirname "$0"`
+sbin=`cd "$sbin"; pwd`
+
+"$sbin"/spark-daemon.sh stop org.apache.spark.deploy.history.HistoryServer 1