diff options
author | Josh Rosen <joshrosen@databricks.com> | 2014-10-25 00:06:57 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-25 00:06:57 -0700 |
commit | 9530316887612dca060a128fca34dd5a6ab2a9a9 (patch) | |
tree | c88a0a75186a5f72676f28e131368f62de8f99b6 | |
parent | 3a845d3c048eebb0bddb3937128746fde3e8e4d8 (diff) | |
download | spark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.gz spark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.bz2 spark-9530316887612dca060a128fca34dd5a6ab2a9a9.zip |
[SPARK-2321] Stable pull-based progress / status API
This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)). For now, I'd like to discuss the basic implementation, API names, and overall interface design. Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API.
#### Design goals:
- Pull-based API
- Usable from Java / Scala / Python (eventually, likely with a wrapper)
- Can be extended to expose more information without introducing binary incompatibilities.
- Returns immutable objects.
- Don't leak any implementation details, preserving our freedom to change the implementation.
#### Implementation:
- Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved.
- Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values. These interfaces consist entirely of Java-style getter methods. The interfaces are currently implemented in Java. I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves.
-Allow an existing JobProgressListener to be used when constructing a live SparkUI. This allows us to re-use this listeners in the implementation of this status API. There are a few reasons why this listener re-use makes sense:
- The status API and web UI are guaranteed to show consistent information.
- These listeners are already well-tested.
- The same garbage-collection / information retention configurations can apply to both this API and the web UI.
- Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings.
The progress API methods are implemented in a separate trait that's mixed into SparkContext. This helps to avoid SparkContext.scala from becoming larger and more difficult to read.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Josh Rosen <joshrosen@apache.org>
Closes #2696 from JoshRosen/progress-reporting-api and squashes the following commits:
e6aa78d [Josh Rosen] Add tests.
b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses.
c96402d [Josh Rosen] Address review comments.
2707f98 [Josh Rosen] Expose current stage attempt id
c28ba76 [Josh Rosen] Update demo code:
646ff1d [Josh Rosen] Document spark.ui.retainedJobs.
7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback.
b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api
787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext.
f9a9a00 [Josh Rosen] More review comments:
3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext.
249ca16 [Josh Rosen] Address several review comments:
da5648e [Josh Rosen] Add example of basic progress reporting in Java.
7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods.
cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark.
6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics:
08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API.
ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener
24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:
21 files changed, 588 insertions, 134 deletions
diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java new file mode 100644 index 0000000000..6e16131370 --- /dev/null +++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +public enum JobExecutionStatus { + RUNNING, + SUCCEEDED, + FAILED, + UNKNOWN +} diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java new file mode 100644 index 0000000000..4e3c983b11 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +/** + * Exposes information about Spark Jobs. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ +public interface SparkJobInfo { + int jobId(); + int[] stageIds(); + JobExecutionStatus status(); +} diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java new file mode 100644 index 0000000000..04e2247210 --- /dev/null +++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark; + +/** + * Exposes information about Spark Stages. + * + * This interface is not designed to be implemented outside of Spark. We may add additional methods + * which may break binary compatibility with outside implementations. + */ +public interface SparkStageInfo { + int stageId(); + int currentAttemptId(); + String name(); + int numTasks(); + int numActiveTasks(); + int numCompletedTasks(); + int numFailedTasks(); +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4565832334..e8fdfff043 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} -import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils} /** @@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat * this config overrides the default configs as well as system properties. */ -class SparkContext(config: SparkConf) extends Logging { +class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { // This is used only by YARN for now, but should be relevant to other cluster types (Mesos, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It @@ -224,10 +224,15 @@ class SparkContext(config: SparkConf) extends Logging { private[spark] val metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf) - // Initialize the Spark UI, registering all associated listeners + + private[spark] val jobProgressListener = new JobProgressListener(conf) + listenerBus.addListener(jobProgressListener) + + // Initialize the Spark UI private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(new SparkUI(this)) + Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, + env.securityManager,appName)) } else { // For tests, do not enable the UI None @@ -855,69 +860,6 @@ class SparkContext(config: SparkConf) extends Logging { def version = SPARK_VERSION /** - * Return a map from the slave to the max memory available for caching and the remaining - * memory available for caching. - */ - def getExecutorMemoryStatus: Map[String, (Long, Long)] = { - env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => - (blockManagerId.host + ":" + blockManagerId.port, mem) - } - } - - /** - * :: DeveloperApi :: - * Return information about what RDDs are cached, if they are in mem or on disk, how much space - * they take, etc. - */ - @DeveloperApi - def getRDDStorageInfo: Array[RDDInfo] = { - val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray - StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) - rddInfos.filter(_.isCached) - } - - /** - * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. - * Note that this does not necessarily mean the caching or computation was successful. - */ - def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap - - /** - * :: DeveloperApi :: - * Return information about blocks stored in all of the slaves - */ - @DeveloperApi - def getExecutorStorageStatus: Array[StorageStatus] = { - env.blockManager.master.getStorageStatus - } - - /** - * :: DeveloperApi :: - * Return pools for fair scheduler - */ - @DeveloperApi - def getAllPools: Seq[Schedulable] = { - // TODO(xiajunluan): We should take nested pools into account - taskScheduler.rootPool.schedulableQueue.toSeq - } - - /** - * :: DeveloperApi :: - * Return the pool associated with the given name, if one exists - */ - @DeveloperApi - def getPoolForName(pool: String): Option[Schedulable] = { - Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) - } - - /** - * Return current scheduling mode - */ - def getSchedulingMode: SchedulingMode.SchedulingMode = { - taskScheduler.schedulingMode - } - - /** * Clear the job's list of files added by `addFile` so that they do not get downloaded to * any new nodes. */ diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala new file mode 100644 index 0000000000..1982499c5e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.Map +import scala.collection.JavaConversions._ + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.scheduler.{SchedulingMode, Schedulable} +import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo} + +/** + * Trait that implements Spark's status APIs. This trait is designed to be mixed into + * SparkContext; it allows the status API code to live in its own file. + */ +private[spark] trait SparkStatusAPI { this: SparkContext => + + /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { + env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => + (blockManagerId.host + ":" + blockManagerId.port, mem) + } + } + + /** + * :: DeveloperApi :: + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + @DeveloperApi + def getRDDStorageInfo: Array[RDDInfo] = { + val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray + StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) + rddInfos.filter(_.isCached) + } + + /** + * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. + * Note that this does not necessarily mean the caching or computation was successful. + */ + def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap + + /** + * :: DeveloperApi :: + * Return information about blocks stored in all of the slaves + */ + @DeveloperApi + def getExecutorStorageStatus: Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + + /** + * :: DeveloperApi :: + * Return pools for fair scheduler + */ + @DeveloperApi + def getAllPools: Seq[Schedulable] = { + // TODO(xiajunluan): We should take nested pools into account + taskScheduler.rootPool.schedulableQueue.toSeq + } + + /** + * :: DeveloperApi :: + * Return the pool associated with the given name, if one exists + */ + @DeveloperApi + def getPoolForName(pool: String): Option[Schedulable] = { + Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) + } + + /** + * Return current scheduling mode + */ + def getSchedulingMode: SchedulingMode.SchedulingMode = { + taskScheduler.schedulingMode + } + + + /** + * Return a list of all known jobs in a particular job group. The returned list may contain + * running, failed, and completed jobs, and may vary across invocations of this method. This + * method does not guarantee the order of the elements in its result. + */ + def getJobIdsForGroup(jobGroup: String): Array[Int] = { + jobProgressListener.synchronized { + val jobData = jobProgressListener.jobIdToData.valuesIterator + jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray + } + } + + /** + * Returns job information, or `None` if the job info could not be found or was garbage collected. + */ + def getJobInfo(jobId: Int): Option[SparkJobInfo] = { + jobProgressListener.synchronized { + jobProgressListener.jobIdToData.get(jobId).map { data => + new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status) + } + } + } + + /** + * Returns stage information, or `None` if the stage info could not be found or was + * garbage collected. + */ + def getStageInfo(stageId: Int): Option[SparkStageInfo] = { + jobProgressListener.synchronized { + for ( + info <- jobProgressListener.stageIdToInfo.get(stageId); + data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) + ) yield { + new SparkStageInfoImpl( + stageId, + info.attemptId, + info.name, + info.numTasks, + data.numActiveTasks, + data.numCompleteTasks, + data.numFailedTasks) + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala new file mode 100644 index 0000000000..90b47c847f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +private class SparkJobInfoImpl ( + val jobId: Int, + val stageIds: Array[Int], + val status: JobExecutionStatus) + extends SparkJobInfo + +private class SparkStageInfoImpl( + val stageId: Int, + val currentAttemptId: Int, + val name: String, + val numTasks: Int, + val numActiveTasks: Int, + val numCompletedTasks: Int, + val numFailedTasks: Int) + extends SparkStageInfo diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 791d853a01..45168ba62d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -132,6 +132,25 @@ class JavaSparkContext(val sc: SparkContext) /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions + + /** + * Return a list of all known jobs in a particular job group. The returned list may contain + * running, failed, and completed jobs, and may vary across invocations of this method. This + * method does not guarantee the order of the elements in its result. + */ + def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup) + + /** + * Returns job information, or `null` if the job info could not be found or was garbage collected. + */ + def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull + + /** + * Returns stage information, or `null` if the stage info could not be found or was + * garbage collected. + */ + def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 481f6c93c6..2d1609b973 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis val ui = { val conf = this.conf.clone() val appSecManager = new SecurityManager(conf) - new SparkUI(conf, appSecManager, replayBus, appId, + SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId, s"${HistoryServer.UI_PATH_PREFIX}/$appId") // Do not call ui.bind() to avoid creating a new server for each application } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 3b6bb9fe12..2f81d472d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -721,8 +721,8 @@ private[spark] class Master( try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)", - HistoryServer.UI_PATH_PREFIX + s"/${app.id}") + val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf), + appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}") replayBus.replay() appIdToUI(app.id) = ui webUi.attachSparkUI(ui) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index cccd59d122..049938f827 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -21,47 +21,30 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.env.EnvironmentTab -import org.apache.spark.ui.exec.ExecutorsTab -import org.apache.spark.ui.jobs.JobProgressTab -import org.apache.spark.ui.storage.StorageTab +import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} +import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} +import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab} +import org.apache.spark.ui.storage.{StorageListener, StorageTab} /** * Top level user interface for a Spark application. */ -private[spark] class SparkUI( - val sc: SparkContext, +private[spark] class SparkUI private ( + val sc: Option[SparkContext], val conf: SparkConf, val securityManager: SecurityManager, - val listenerBus: SparkListenerBus, + val environmentListener: EnvironmentListener, + val storageStatusListener: StorageStatusListener, + val executorsListener: ExecutorsListener, + val jobProgressListener: JobProgressListener, + val storageListener: StorageListener, var appName: String, - val basePath: String = "") + val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging { - def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) - def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath) - - def this( - conf: SparkConf, - securityManager: SecurityManager, - listenerBus: SparkListenerBus, - appName: String, - basePath: String) = - this(null, conf, securityManager, listenerBus, appName, basePath) - - // If SparkContext is not provided, assume the associated application is not live - val live = sc != null - - // Maintain executor storage status through Spark events - val storageStatusListener = new StorageStatusListener - - initialize() - /** Initialize all components of the server. */ def initialize() { - listenerBus.addListener(storageStatusListener) val jobProgressTab = new JobProgressTab(this) attachTab(jobProgressTab) attachTab(new StorageTab(this)) @@ -71,10 +54,10 @@ private[spark] class SparkUI( attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) - if (live) { - sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) - } + // If the UI is live, then serve + sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) } } + initialize() def getAppName = appName @@ -83,11 +66,6 @@ private[spark] class SparkUI( appName = name } - /** Register the given listener with the listener bus. */ - def registerListener(listener: SparkListener) { - listenerBus.addListener(listener) - } - /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { super.stop() @@ -116,4 +94,60 @@ private[spark] object SparkUI { def getUIPort(conf: SparkConf): Int = { conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) } + + def createLiveUI( + sc: SparkContext, + conf: SparkConf, + listenerBus: SparkListenerBus, + jobProgressListener: JobProgressListener, + securityManager: SecurityManager, + appName: String): SparkUI = { + create(Some(sc), conf, listenerBus, securityManager, appName, + jobProgressListener = Some(jobProgressListener)) + } + + def createHistoryUI( + conf: SparkConf, + listenerBus: SparkListenerBus, + securityManager: SecurityManager, + appName: String, + basePath: String): SparkUI = { + create(None, conf, listenerBus, securityManager, appName, basePath) + } + + /** + * Create a new Spark UI. + * + * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs. + * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the + * web UI will create and register its own JobProgressListener. + */ + private def create( + sc: Option[SparkContext], + conf: SparkConf, + listenerBus: SparkListenerBus, + securityManager: SecurityManager, + appName: String, + basePath: String = "", + jobProgressListener: Option[JobProgressListener] = None): SparkUI = { + + val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { + val listener = new JobProgressListener(conf) + listenerBus.addListener(listener) + listener + } + + val environmentListener = new EnvironmentListener + val storageStatusListener = new StorageStatusListener + val executorsListener = new ExecutorsListener(storageStatusListener) + val storageListener = new StorageListener(storageStatusListener) + + listenerBus.addListener(environmentListener) + listenerBus.addListener(storageStatusListener) + listenerBus.addListener(executorsListener) + listenerBus.addListener(storageListener) + + new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, + executorsListener, _jobProgressListener, storageListener, appName, basePath) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index 0d158fbe63..f62260c6f6 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -22,10 +22,8 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui._ private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") { - val listener = new EnvironmentListener - + val listener = parent.environmentListener attachPage(new EnvironmentPage(this)) - parent.registerListener(listener) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 61eb111cd9..689cf02b25 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -26,10 +26,9 @@ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.{SparkUI, SparkUITab} private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") { - val listener = new ExecutorsListener(parent.storageStatusListener) + val listener = parent.executorsListener attachPage(new ExecutorsPage(this)) - parent.registerListener(listener) } /** diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index eaeb861f59..b520736051 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -40,17 +40,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ + type JobId = Int + type StageId = Int + type StageAttemptId = Int + // How many stages to remember val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) + // How many jobs to remember + val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS) - // Map from stageId to StageInfo - val activeStages = new HashMap[Int, StageInfo] - - // Map from (stageId, attemptId) to StageUIData - val stageIdToData = new HashMap[(Int, Int), StageUIData] + val activeJobs = new HashMap[JobId, JobUIData] + val completedJobs = ListBuffer[JobUIData]() + val failedJobs = ListBuffer[JobUIData]() + val jobIdToData = new HashMap[JobId, JobUIData] + val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() + val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] + val stageIdToInfo = new HashMap[StageId, StageInfo] // Map from pool name to a hash map (map from stage id to StageInfo). val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]() @@ -61,8 +69,32 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { def blockManagerIds = executorIdToBlockManagerId.values.toSeq + override def onJobStart(jobStart: SparkListenerJobStart) = synchronized { + val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) + val jobData: JobUIData = + new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING) + jobIdToData(jobStart.jobId) = jobData + activeJobs(jobStart.jobId) = jobData + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized { + val jobData = activeJobs.remove(jobEnd.jobId).getOrElse { + logWarning(s"Job completed for unknown job ${jobEnd.jobId}") + new JobUIData(jobId = jobEnd.jobId) + } + jobEnd.jobResult match { + case JobSucceeded => + completedJobs += jobData + jobData.status = JobExecutionStatus.SUCCEEDED + case JobFailed(exception) => + failedJobs += jobData + jobData.status = JobExecutionStatus.FAILED + } + } + override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized { val stage = stageCompleted.stageInfo + stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), { logWarning("Stage completed for unknown stage " + stage.stageId) new StageUIData @@ -89,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized { if (stages.size > retainedStages) { val toRemove = math.max(retainedStages / 10, 1) - stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) } + stages.take(toRemove).foreach { s => + stageIdToData.remove((s.stageId, s.attemptId)) + stageIdToInfo.remove(s.stageId) + } stages.trimStart(toRemove) } } @@ -103,6 +138,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME) + stageIdToInfo(stage.stageId) = stage val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData) stageData.schedulingPool = poolName @@ -277,4 +313,5 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { private object JobProgressListener { val DEFAULT_POOL_NAME = "default" val DEFAULT_RETAINED_STAGES = 1000 + val DEFAULT_RETAINED_JOBS = 1000 } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 1e02f1225d..6e718eecdd 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") { - private val live = parent.live private val sc = parent.sc private val listener = parent.listener private def isFairScheduler = parent.isFairScheduler @@ -47,17 +46,17 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent) // For now, pool information is only accessible in live UIs - val pools = if (live) sc.getAllPools else Seq[Schedulable]() + val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) val poolTable = new PoolTable(pools, parent) val summary: NodeSeq = <div> <ul class="unstyled"> - {if (live) { + {if (sc.isDefined) { // Total duration is not meaningful unless the UI is live <li> <strong>Total Duration: </strong> - {UIUtils.formatDuration(now - sc.startTime)} + {UIUtils.formatDuration(now - sc.get.startTime)} </li> }} <li> @@ -80,7 +79,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") </div> val content = summary ++ - {if (live && isFairScheduler) { + {if (sc.isDefined && isFairScheduler) { <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq } else { Seq[Node]() diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index c16542c9db..03ca918e2e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -25,16 +25,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") { - val live = parent.live val sc = parent.sc - val conf = if (live) sc.conf else new SparkConf - val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) - val listener = new JobProgressListener(conf) + val conf = sc.map(_.conf).getOrElse(new SparkConf) + val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) + val listener = parent.jobProgressListener attachPage(new JobProgressPage(this)) attachPage(new StagePage(this)) attachPage(new PoolPage(this)) - parent.registerListener(listener) def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) @@ -43,7 +41,7 @@ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "st val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) { - sc.cancelStage(stageId) + sc.get.cancelStage(stageId) } // Do a quick pause here to give Spark time to kill the stage so it shows up as // killed after the refresh. Note that this will block the serving thread so the diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 7a6c7d1a49..770d99eea1 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { - private val live = parent.live private val sc = parent.sc private val listener = parent.listener @@ -42,7 +41,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent) // For now, pool information is only accessible in live UIs - val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() + val pools = sc.map(_.getPoolForName(poolName).get).toSeq val poolTable = new PoolTable(pools, parent) val content = diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index a336bf7e1e..e2813f8eb5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -17,6 +17,7 @@ package org.apache.spark.ui.jobs +import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo} import org.apache.spark.util.collection.OpenHashSet @@ -36,6 +37,13 @@ private[jobs] object UIData { var diskBytesSpilled : Long = 0 } + class JobUIData( + var jobId: Int = -1, + var stageIds: Seq[Int] = Seq.empty, + var jobGroup: Option[String] = None, + var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN + ) + class StageUIData { var numActiveTasks: Int = _ var numCompleteTasks: Int = _ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 76097f1c51..a81291d505 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -26,11 +26,10 @@ import org.apache.spark.storage._ /** Web UI showing storage status of all RDD's in the given SparkContext. */ private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storage") { - val listener = new StorageListener(parent.storageStatusListener) + val listener = parent.storageListener attachPage(new StoragePage(this)) attachPage(new RDDPage(this)) - parent.registerListener(listener) } /** diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala new file mode 100644 index 0000000000..4468fba8c1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.concurrent.duration._ +import scala.language.implicitConversions +import scala.language.postfixOps + +import org.scalatest.{Matchers, FunSuite} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark.JobExecutionStatus._ +import org.apache.spark.SparkContext._ + +class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext { + + test("basic status API usage") { + val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync() + val jobId: Int = eventually(timeout(10 seconds)) { + val jobIds = jobFuture.jobIds + jobIds.size should be(1) + jobIds.head + } + val jobInfo = eventually(timeout(10 seconds)) { + sc.getJobInfo(jobId).get + } + jobInfo.status() should not be FAILED + val stageIds = jobInfo.stageIds() + stageIds.size should be(2) + + val firstStageInfo = eventually(timeout(10 seconds)) { + sc.getStageInfo(stageIds(0)).get + } + firstStageInfo.stageId() should be(stageIds(0)) + firstStageInfo.currentAttemptId() should be(0) + firstStageInfo.numTasks() should be(2) + eventually(timeout(10 seconds)) { + val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get + updatedFirstStageInfo.numCompletedTasks() should be(2) + updatedFirstStageInfo.numActiveTasks() should be(0) + updatedFirstStageInfo.numFailedTasks() should be(0) + } + } + + test("getJobIdsForGroup()") { + sc.setJobGroup("my-job-group", "description") + sc.getJobIdsForGroup("my-job-group") should be (Seq.empty) + val firstJobFuture = sc.parallelize(1 to 1000).countAsync() + val firstJobId = eventually(timeout(10 seconds)) { + firstJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId)) + } + val secondJobFuture = sc.parallelize(1 to 1000).countAsync() + val secondJobId = eventually(timeout(10 seconds)) { + secondJobFuture.jobIds.head + } + eventually(timeout(10 seconds)) { + sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId)) + } + } +}
\ No newline at end of file diff --git a/docs/configuration.md b/docs/configuration.md index 66738d3ca7..3007706a25 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -375,7 +375,16 @@ Apart from these, the following properties are also available, and may be useful <td><code>spark.ui.retainedStages</code></td> <td>1000</td> <td> - How many stages the Spark UI remembers before garbage collecting. + How many stages the Spark UI and status APIs remember before garbage + collecting. + </td> +</tr> +<tr> + <td><code>spark.ui.retainedJobs</code></td> + <td>1000</td> + <td> + How many stages the Spark UI and status APIs remember before garbage + collecting. </td> </tr> <tr> diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java new file mode 100644 index 0000000000..430e96ab14 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkJobInfo; +import org.apache.spark.SparkStageInfo; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import java.util.Arrays; +import java.util.List; + +/** + * Example of using Spark's status APIs from Java. + */ +public final class JavaStatusAPIDemo { + + public static final String APP_NAME = "JavaStatusAPIDemo"; + + public static final class IdentityWithDelay<T> implements Function<T, T> { + @Override + public T call(T x) throws Exception { + Thread.sleep(2 * 1000); // 2 seconds + return x; + } + } + + public static void main(String[] args) throws Exception { + SparkConf sparkConf = new SparkConf().setAppName(APP_NAME); + final JavaSparkContext sc = new JavaSparkContext(sparkConf); + + // Example of implementing a progress reporter for a simple job. + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map( + new IdentityWithDelay<Integer>()); + JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync(); + while (!jobFuture.isDone()) { + Thread.sleep(1000); // 1 second + List<Integer> jobIds = jobFuture.jobIds(); + if (jobIds.isEmpty()) { + continue; + } + int currentJobId = jobIds.get(jobIds.size() - 1); + SparkJobInfo jobInfo = sc.getJobInfo(currentJobId); + SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]); + System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() + + " active, " + stageInfo.numCompletedTasks() + " complete"); + } + + System.out.println("Job results are: " + jobFuture.get()); + sc.stop(); + } +} |