aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-10-25 00:06:57 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-25 00:06:57 -0700
commit9530316887612dca060a128fca34dd5a6ab2a9a9 (patch)
treec88a0a75186a5f72676f28e131368f62de8f99b6
parent3a845d3c048eebb0bddb3937128746fde3e8e4d8 (diff)
downloadspark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.gz
spark-9530316887612dca060a128fca34dd5a6ab2a9a9.tar.bz2
spark-9530316887612dca060a128fca34dd5a6ab2a9a9.zip
[SPARK-2321] Stable pull-based progress / status API
This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)). For now, I'd like to discuss the basic implementation, API names, and overall interface design. Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API. #### Design goals: - Pull-based API - Usable from Java / Scala / Python (eventually, likely with a wrapper) - Can be extended to expose more information without introducing binary incompatibilities. - Returns immutable objects. - Don't leak any implementation details, preserving our freedom to change the implementation. #### Implementation: - Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved. - Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values. These interfaces consist entirely of Java-style getter methods. The interfaces are currently implemented in Java. I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves. -Allow an existing JobProgressListener to be used when constructing a live SparkUI. This allows us to re-use this listeners in the implementation of this status API. There are a few reasons why this listener re-use makes sense: - The status API and web UI are guaranteed to show consistent information. - These listeners are already well-tested. - The same garbage-collection / information retention configurations can apply to both this API and the web UI. - Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings. The progress API methods are implemented in a separate trait that's mixed into SparkContext. This helps to avoid SparkContext.scala from becoming larger and more difficult to read. Author: Josh Rosen <joshrosen@databricks.com> Author: Josh Rosen <joshrosen@apache.org> Closes #2696 from JoshRosen/progress-reporting-api and squashes the following commits: e6aa78d [Josh Rosen] Add tests. b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses. c96402d [Josh Rosen] Address review comments. 2707f98 [Josh Rosen] Expose current stage attempt id c28ba76 [Josh Rosen] Update demo code: 646ff1d [Josh Rosen] Document spark.ui.retainedJobs. 7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback. b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api 787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext. f9a9a00 [Josh Rosen] More review comments: 3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext. 249ca16 [Josh Rosen] Address several review comments: da5648e [Josh Rosen] Add example of basic progress reporting in Java. 7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods. cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark. 6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics: 08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API. ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener 24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:
-rw-r--r--core/src/main/java/org/apache/spark/JobExecutionStatus.java25
-rw-r--r--core/src/main/java/org/apache/spark/SparkJobInfo.java30
-rw-r--r--core/src/main/java/org/apache/spark/SparkStageInfo.java34
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala76
-rw-r--r--core/src/main/scala/org/apache/spark/SparkStatusAPI.scala142
-rw-r--r--core/src/main/scala/org/apache/spark/StatusAPIImpl.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala19
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/SparkUI.scala108
-rw-r--r--core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala49
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/StatusAPISuite.scala78
-rw-r--r--docs/configuration.md11
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java70
21 files changed, 588 insertions, 134 deletions
diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
new file mode 100644
index 0000000000..6e16131370
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+public enum JobExecutionStatus {
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ UNKNOWN
+}
diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java
new file mode 100644
index 0000000000..4e3c983b11
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+/**
+ * Exposes information about Spark Jobs.
+ *
+ * This interface is not designed to be implemented outside of Spark. We may add additional methods
+ * which may break binary compatibility with outside implementations.
+ */
+public interface SparkJobInfo {
+ int jobId();
+ int[] stageIds();
+ JobExecutionStatus status();
+}
diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java
new file mode 100644
index 0000000000..04e2247210
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+/**
+ * Exposes information about Spark Stages.
+ *
+ * This interface is not designed to be implemented outside of Spark. We may add additional methods
+ * which may break binary compatibility with outside implementations.
+ */
+public interface SparkStageInfo {
+ int stageId();
+ int currentAttemptId();
+ String name();
+ int numTasks();
+ int numActiveTasks();
+ int numCompletedTasks();
+ int numFailedTasks();
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4565832334..e8fdfff043 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
@@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
/**
@@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
* this config overrides the default configs as well as system properties.
*/
-class SparkContext(config: SparkConf) extends Logging {
+class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
@@ -224,10 +224,15 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
- // Initialize the Spark UI, registering all associated listeners
+
+ private[spark] val jobProgressListener = new JobProgressListener(conf)
+ listenerBus.addListener(jobProgressListener)
+
+ // Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
- Some(new SparkUI(this))
+ Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
+ env.securityManager,appName))
} else {
// For tests, do not enable the UI
None
@@ -855,69 +860,6 @@ class SparkContext(config: SparkConf) extends Logging {
def version = SPARK_VERSION
/**
- * Return a map from the slave to the max memory available for caching and the remaining
- * memory available for caching.
- */
- def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
- env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
- (blockManagerId.host + ":" + blockManagerId.port, mem)
- }
- }
-
- /**
- * :: DeveloperApi ::
- * Return information about what RDDs are cached, if they are in mem or on disk, how much space
- * they take, etc.
- */
- @DeveloperApi
- def getRDDStorageInfo: Array[RDDInfo] = {
- val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
- StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
- rddInfos.filter(_.isCached)
- }
-
- /**
- * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
- * Note that this does not necessarily mean the caching or computation was successful.
- */
- def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
-
- /**
- * :: DeveloperApi ::
- * Return information about blocks stored in all of the slaves
- */
- @DeveloperApi
- def getExecutorStorageStatus: Array[StorageStatus] = {
- env.blockManager.master.getStorageStatus
- }
-
- /**
- * :: DeveloperApi ::
- * Return pools for fair scheduler
- */
- @DeveloperApi
- def getAllPools: Seq[Schedulable] = {
- // TODO(xiajunluan): We should take nested pools into account
- taskScheduler.rootPool.schedulableQueue.toSeq
- }
-
- /**
- * :: DeveloperApi ::
- * Return the pool associated with the given name, if one exists
- */
- @DeveloperApi
- def getPoolForName(pool: String): Option[Schedulable] = {
- Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
- }
-
- /**
- * Return current scheduling mode
- */
- def getSchedulingMode: SchedulingMode.SchedulingMode = {
- taskScheduler.schedulingMode
- }
-
- /**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.
*/
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
new file mode 100644
index 0000000000..1982499c5e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.Map
+import scala.collection.JavaConversions._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
+import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+
+/**
+ * Trait that implements Spark's status APIs. This trait is designed to be mixed into
+ * SparkContext; it allows the status API code to live in its own file.
+ */
+private[spark] trait SparkStatusAPI { this: SparkContext =>
+
+ /**
+ * Return a map from the slave to the max memory available for caching and the remaining
+ * memory available for caching.
+ */
+ def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
+ env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
+ (blockManagerId.host + ":" + blockManagerId.port, mem)
+ }
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Return information about what RDDs are cached, if they are in mem or on disk, how much space
+ * they take, etc.
+ */
+ @DeveloperApi
+ def getRDDStorageInfo: Array[RDDInfo] = {
+ val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
+ StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
+ rddInfos.filter(_.isCached)
+ }
+
+ /**
+ * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
+ * Note that this does not necessarily mean the caching or computation was successful.
+ */
+ def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
+
+ /**
+ * :: DeveloperApi ::
+ * Return information about blocks stored in all of the slaves
+ */
+ @DeveloperApi
+ def getExecutorStorageStatus: Array[StorageStatus] = {
+ env.blockManager.master.getStorageStatus
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Return pools for fair scheduler
+ */
+ @DeveloperApi
+ def getAllPools: Seq[Schedulable] = {
+ // TODO(xiajunluan): We should take nested pools into account
+ taskScheduler.rootPool.schedulableQueue.toSeq
+ }
+
+ /**
+ * :: DeveloperApi ::
+ * Return the pool associated with the given name, if one exists
+ */
+ @DeveloperApi
+ def getPoolForName(pool: String): Option[Schedulable] = {
+ Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
+ }
+
+ /**
+ * Return current scheduling mode
+ */
+ def getSchedulingMode: SchedulingMode.SchedulingMode = {
+ taskScheduler.schedulingMode
+ }
+
+
+ /**
+ * Return a list of all known jobs in a particular job group. The returned list may contain
+ * running, failed, and completed jobs, and may vary across invocations of this method. This
+ * method does not guarantee the order of the elements in its result.
+ */
+ def getJobIdsForGroup(jobGroup: String): Array[Int] = {
+ jobProgressListener.synchronized {
+ val jobData = jobProgressListener.jobIdToData.valuesIterator
+ jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray
+ }
+ }
+
+ /**
+ * Returns job information, or `None` if the job info could not be found or was garbage collected.
+ */
+ def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
+ jobProgressListener.synchronized {
+ jobProgressListener.jobIdToData.get(jobId).map { data =>
+ new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
+ }
+ }
+ }
+
+ /**
+ * Returns stage information, or `None` if the stage info could not be found or was
+ * garbage collected.
+ */
+ def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
+ jobProgressListener.synchronized {
+ for (
+ info <- jobProgressListener.stageIdToInfo.get(stageId);
+ data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
+ ) yield {
+ new SparkStageInfoImpl(
+ stageId,
+ info.attemptId,
+ info.name,
+ info.numTasks,
+ data.numActiveTasks,
+ data.numCompleteTasks,
+ data.numFailedTasks)
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
new file mode 100644
index 0000000000..90b47c847f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+private class SparkJobInfoImpl (
+ val jobId: Int,
+ val stageIds: Array[Int],
+ val status: JobExecutionStatus)
+ extends SparkJobInfo
+
+private class SparkStageInfoImpl(
+ val stageId: Int,
+ val currentAttemptId: Int,
+ val name: String,
+ val numTasks: Int,
+ val numActiveTasks: Int,
+ val numCompletedTasks: Int,
+ val numFailedTasks: Int)
+ extends SparkStageInfo
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 791d853a01..45168ba62d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -132,6 +132,25 @@ class JavaSparkContext(val sc: SparkContext)
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
+
+ /**
+ * Return a list of all known jobs in a particular job group. The returned list may contain
+ * running, failed, and completed jobs, and may vary across invocations of this method. This
+ * method does not guarantee the order of the elements in its result.
+ */
+ def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)
+
+ /**
+ * Returns job information, or `null` if the job info could not be found or was garbage collected.
+ */
+ def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull
+
+ /**
+ * Returns stage information, or `null` if the stage info could not be found or was
+ * garbage collected.
+ */
+ def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull
+
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 481f6c93c6..2d1609b973 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
- new SparkUI(conf, appSecManager, replayBus, appId,
+ SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 3b6bb9fe12..2f81d472d7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -721,8 +721,8 @@ private[spark] class Master(
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
- val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
- HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+ val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
+ appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index cccd59d122..049938f827 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -21,47 +21,30 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.env.EnvironmentTab
-import org.apache.spark.ui.exec.ExecutorsTab
-import org.apache.spark.ui.jobs.JobProgressTab
-import org.apache.spark.ui.storage.StorageTab
+import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
+import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
+import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab}
+import org.apache.spark.ui.storage.{StorageListener, StorageTab}
/**
* Top level user interface for a Spark application.
*/
-private[spark] class SparkUI(
- val sc: SparkContext,
+private[spark] class SparkUI private (
+ val sc: Option[SparkContext],
val conf: SparkConf,
val securityManager: SecurityManager,
- val listenerBus: SparkListenerBus,
+ val environmentListener: EnvironmentListener,
+ val storageStatusListener: StorageStatusListener,
+ val executorsListener: ExecutorsListener,
+ val jobProgressListener: JobProgressListener,
+ val storageListener: StorageListener,
var appName: String,
- val basePath: String = "")
+ val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
with Logging {
- def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
- def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
- this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath)
-
- def this(
- conf: SparkConf,
- securityManager: SecurityManager,
- listenerBus: SparkListenerBus,
- appName: String,
- basePath: String) =
- this(null, conf, securityManager, listenerBus, appName, basePath)
-
- // If SparkContext is not provided, assume the associated application is not live
- val live = sc != null
-
- // Maintain executor storage status through Spark events
- val storageStatusListener = new StorageStatusListener
-
- initialize()
-
/** Initialize all components of the server. */
def initialize() {
- listenerBus.addListener(storageStatusListener)
val jobProgressTab = new JobProgressTab(this)
attachTab(jobProgressTab)
attachTab(new StorageTab(this))
@@ -71,10 +54,10 @@ private[spark] class SparkUI(
attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
attachHandler(
createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
- if (live) {
- sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
- }
+ // If the UI is live, then serve
+ sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
}
+ initialize()
def getAppName = appName
@@ -83,11 +66,6 @@ private[spark] class SparkUI(
appName = name
}
- /** Register the given listener with the listener bus. */
- def registerListener(listener: SparkListener) {
- listenerBus.addListener(listener)
- }
-
/** Stop the server behind this web interface. Only valid after bind(). */
override def stop() {
super.stop()
@@ -116,4 +94,60 @@ private[spark] object SparkUI {
def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
}
+
+ def createLiveUI(
+ sc: SparkContext,
+ conf: SparkConf,
+ listenerBus: SparkListenerBus,
+ jobProgressListener: JobProgressListener,
+ securityManager: SecurityManager,
+ appName: String): SparkUI = {
+ create(Some(sc), conf, listenerBus, securityManager, appName,
+ jobProgressListener = Some(jobProgressListener))
+ }
+
+ def createHistoryUI(
+ conf: SparkConf,
+ listenerBus: SparkListenerBus,
+ securityManager: SecurityManager,
+ appName: String,
+ basePath: String): SparkUI = {
+ create(None, conf, listenerBus, securityManager, appName, basePath)
+ }
+
+ /**
+ * Create a new Spark UI.
+ *
+ * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs.
+ * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the
+ * web UI will create and register its own JobProgressListener.
+ */
+ private def create(
+ sc: Option[SparkContext],
+ conf: SparkConf,
+ listenerBus: SparkListenerBus,
+ securityManager: SecurityManager,
+ appName: String,
+ basePath: String = "",
+ jobProgressListener: Option[JobProgressListener] = None): SparkUI = {
+
+ val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
+ val listener = new JobProgressListener(conf)
+ listenerBus.addListener(listener)
+ listener
+ }
+
+ val environmentListener = new EnvironmentListener
+ val storageStatusListener = new StorageStatusListener
+ val executorsListener = new ExecutorsListener(storageStatusListener)
+ val storageListener = new StorageListener(storageStatusListener)
+
+ listenerBus.addListener(environmentListener)
+ listenerBus.addListener(storageStatusListener)
+ listenerBus.addListener(executorsListener)
+ listenerBus.addListener(storageListener)
+
+ new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
+ executorsListener, _jobProgressListener, storageListener, appName, basePath)
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
index 0d158fbe63..f62260c6f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -22,10 +22,8 @@ import org.apache.spark.scheduler._
import org.apache.spark.ui._
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
- val listener = new EnvironmentListener
-
+ val listener = parent.environmentListener
attachPage(new EnvironmentPage(this))
- parent.registerListener(listener)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 61eb111cd9..689cf02b25 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -26,10 +26,9 @@ import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
- val listener = new ExecutorsListener(parent.storageStatusListener)
+ val listener = parent.executorsListener
attachPage(new ExecutorsPage(this))
- parent.registerListener(listener)
}
/**
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index eaeb861f59..b520736051 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -40,17 +40,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
import JobProgressListener._
+ type JobId = Int
+ type StageId = Int
+ type StageAttemptId = Int
+
// How many stages to remember
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
+ // How many jobs to remember
+ val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
- // Map from stageId to StageInfo
- val activeStages = new HashMap[Int, StageInfo]
-
- // Map from (stageId, attemptId) to StageUIData
- val stageIdToData = new HashMap[(Int, Int), StageUIData]
+ val activeJobs = new HashMap[JobId, JobUIData]
+ val completedJobs = ListBuffer[JobUIData]()
+ val failedJobs = ListBuffer[JobUIData]()
+ val jobIdToData = new HashMap[JobId, JobUIData]
+ val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
+ val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
+ val stageIdToInfo = new HashMap[StageId, StageInfo]
// Map from pool name to a hash map (map from stage id to StageInfo).
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
@@ -61,8 +69,32 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
def blockManagerIds = executorIdToBlockManagerId.values.toSeq
+ override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
+ val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+ val jobData: JobUIData =
+ new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING)
+ jobIdToData(jobStart.jobId) = jobData
+ activeJobs(jobStart.jobId) = jobData
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
+ val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
+ logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
+ new JobUIData(jobId = jobEnd.jobId)
+ }
+ jobEnd.jobResult match {
+ case JobSucceeded =>
+ completedJobs += jobData
+ jobData.status = JobExecutionStatus.SUCCEEDED
+ case JobFailed(exception) =>
+ failedJobs += jobData
+ jobData.status = JobExecutionStatus.FAILED
+ }
+ }
+
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
+ stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
@@ -89,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
- stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) }
+ stages.take(toRemove).foreach { s =>
+ stageIdToData.remove((s.stageId, s.attemptId))
+ stageIdToInfo.remove(s.stageId)
+ }
stages.trimStart(toRemove)
}
}
@@ -103,6 +138,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
+ stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName
@@ -277,4 +313,5 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
+ val DEFAULT_RETAINED_JOBS = 1000
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
index 1e02f1225d..6e718eecdd 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
- private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
private def isFairScheduler = parent.isFairScheduler
@@ -47,17 +46,17 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
// For now, pool information is only accessible in live UIs
- val pools = if (live) sc.getAllPools else Seq[Schedulable]()
+ val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
val poolTable = new PoolTable(pools, parent)
val summary: NodeSeq =
<div>
<ul class="unstyled">
- {if (live) {
+ {if (sc.isDefined) {
// Total duration is not meaningful unless the UI is live
<li>
<strong>Total Duration: </strong>
- {UIUtils.formatDuration(now - sc.startTime)}
+ {UIUtils.formatDuration(now - sc.get.startTime)}
</li>
}}
<li>
@@ -80,7 +79,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
</div>
val content = summary ++
- {if (live && isFairScheduler) {
+ {if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq[Node]()
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
index c16542c9db..03ca918e2e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -25,16 +25,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
- val live = parent.live
val sc = parent.sc
- val conf = if (live) sc.conf else new SparkConf
- val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
- val listener = new JobProgressListener(conf)
+ val conf = sc.map(_.conf).getOrElse(new SparkConf)
+ val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
+ val listener = parent.jobProgressListener
attachPage(new JobProgressPage(this))
attachPage(new StagePage(this))
attachPage(new PoolPage(this))
- parent.registerListener(listener)
def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
@@ -43,7 +41,7 @@ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "st
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
- sc.cancelStage(stageId)
+ sc.get.cancelStage(stageId)
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 7a6c7d1a49..770d99eea1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing specific pool details */
private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
- private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
@@ -42,7 +41,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent)
// For now, pool information is only accessible in live UIs
- val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]()
+ val pools = sc.map(_.getPoolForName(poolName).get).toSeq
val poolTable = new PoolTable(pools, parent)
val content =
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index a336bf7e1e..e2813f8eb5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -17,6 +17,7 @@
package org.apache.spark.ui.jobs
+import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.util.collection.OpenHashSet
@@ -36,6 +37,13 @@ private[jobs] object UIData {
var diskBytesSpilled : Long = 0
}
+ class JobUIData(
+ var jobId: Int = -1,
+ var stageIds: Seq[Int] = Seq.empty,
+ var jobGroup: Option[String] = None,
+ var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN
+ )
+
class StageUIData {
var numActiveTasks: Int = _
var numCompleteTasks: Int = _
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 76097f1c51..a81291d505 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -26,11 +26,10 @@ import org.apache.spark.storage._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storage") {
- val listener = new StorageListener(parent.storageStatusListener)
+ val listener = parent.storageListener
attachPage(new StoragePage(this))
attachPage(new RDDPage(this))
- parent.registerListener(listener)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala
new file mode 100644
index 0000000000..4468fba8c1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.JobExecutionStatus._
+import org.apache.spark.SparkContext._
+
+class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {
+
+ test("basic status API usage") {
+ val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
+ val jobId: Int = eventually(timeout(10 seconds)) {
+ val jobIds = jobFuture.jobIds
+ jobIds.size should be(1)
+ jobIds.head
+ }
+ val jobInfo = eventually(timeout(10 seconds)) {
+ sc.getJobInfo(jobId).get
+ }
+ jobInfo.status() should not be FAILED
+ val stageIds = jobInfo.stageIds()
+ stageIds.size should be(2)
+
+ val firstStageInfo = eventually(timeout(10 seconds)) {
+ sc.getStageInfo(stageIds(0)).get
+ }
+ firstStageInfo.stageId() should be(stageIds(0))
+ firstStageInfo.currentAttemptId() should be(0)
+ firstStageInfo.numTasks() should be(2)
+ eventually(timeout(10 seconds)) {
+ val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get
+ updatedFirstStageInfo.numCompletedTasks() should be(2)
+ updatedFirstStageInfo.numActiveTasks() should be(0)
+ updatedFirstStageInfo.numFailedTasks() should be(0)
+ }
+ }
+
+ test("getJobIdsForGroup()") {
+ sc.setJobGroup("my-job-group", "description")
+ sc.getJobIdsForGroup("my-job-group") should be (Seq.empty)
+ val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
+ val firstJobId = eventually(timeout(10 seconds)) {
+ firstJobFuture.jobIds.head
+ }
+ eventually(timeout(10 seconds)) {
+ sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
+ }
+ val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
+ val secondJobId = eventually(timeout(10 seconds)) {
+ secondJobFuture.jobIds.head
+ }
+ eventually(timeout(10 seconds)) {
+ sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
+ }
+ }
+} \ No newline at end of file
diff --git a/docs/configuration.md b/docs/configuration.md
index 66738d3ca7..3007706a25 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -375,7 +375,16 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.ui.retainedStages</code></td>
<td>1000</td>
<td>
- How many stages the Spark UI remembers before garbage collecting.
+ How many stages the Spark UI and status APIs remember before garbage
+ collecting.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.ui.retainedJobs</code></td>
+ <td>1000</td>
+ <td>
+ How many stages the Spark UI and status APIs remember before garbage
+ collecting.
</td>
</tr>
<tr>
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
new file mode 100644
index 0000000000..430e96ab14
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Example of using Spark's status APIs from Java.
+ */
+public final class JavaStatusAPIDemo {
+
+ public static final String APP_NAME = "JavaStatusAPIDemo";
+
+ public static final class IdentityWithDelay<T> implements Function<T, T> {
+ @Override
+ public T call(T x) throws Exception {
+ Thread.sleep(2 * 1000); // 2 seconds
+ return x;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ SparkConf sparkConf = new SparkConf().setAppName(APP_NAME);
+ final JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+ // Example of implementing a progress reporter for a simple job.
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
+ new IdentityWithDelay<Integer>());
+ JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
+ while (!jobFuture.isDone()) {
+ Thread.sleep(1000); // 1 second
+ List<Integer> jobIds = jobFuture.jobIds();
+ if (jobIds.isEmpty()) {
+ continue;
+ }
+ int currentJobId = jobIds.get(jobIds.size() - 1);
+ SparkJobInfo jobInfo = sc.getJobInfo(currentJobId);
+ SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]);
+ System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
+ " active, " + stageInfo.numCompletedTasks() + " complete");
+ }
+
+ System.out.println("Job results are: " + jobFuture.get());
+ sc.stop();
+ }
+}