aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-08-20 15:37:27 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-08-20 15:37:27 -0700
commitfb60bec34e0b20ae95b4b865a79744916e0a5737 (patch)
treea9f9bb8d0d0e749b72f7b38c52b6a5e0f7a4985f /core
parentb3ec51bfd795772ff96d18228e979a52ebc82ec4 (diff)
downloadspark-fb60bec34e0b20ae95b4b865a79744916e0a5737.tar.gz
spark-fb60bec34e0b20ae95b4b865a79744916e0a5737.tar.bz2
spark-fb60bec34e0b20ae95b4b865a79744916e0a5737.zip
[SPARK-2298] Encode stage attempt in SparkListener & UI.
Simple way to reproduce this in the UI: ```scala val f = new java.io.File("/tmp/test") f.delete() sc.parallelize(1 to 2, 2).map(x => (x,x )).repartition(3).mapPartitionsWithContext { case (context, iter) => if (context.partitionId == 0) { val f = new java.io.File("/tmp/test") if (!f.exists) { f.mkdir() System.exit(0); } } iter }.count() ``` Author: Reynold Xin <rxin@apache.org> Closes #1545 from rxin/stage-attempt and squashes the following commits: 3ee1d2a [Reynold Xin] - Rename attempt to retry in UI. - Properly report stage failure in FetchFailed. 40a6bd5 [Reynold Xin] Updated test suites. c414c36 [Reynold Xin] Fixed the hanging in JobCancellationSuite. b3e2eed [Reynold Xin] Oops previous code didn't compile. 0f36075 [Reynold Xin] Mark unknown stage attempt with id -1 and drop that in JobProgressListener. 6c08b07 [Reynold Xin] Addressed code review feedback. 4e5faa2 [Reynold Xin] [SPARK-2298] Encode stage attempt in SparkListener & UI.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala77
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/Stage.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala40
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/util/JsonProtocol.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala68
-rw-r--r--core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala16
-rw-r--r--core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala476
15 files changed, 555 insertions, 224 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b86cfbfa48..3413198457 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -164,7 +164,7 @@ class DAGScheduler(
*/
def executorHeartbeatReceived(
execId: String,
- taskMetrics: Array[(Long, Int, TaskMetrics)], // (taskId, stageId, metrics)
+ taskMetrics: Array[(Long, Int, Int, TaskMetrics)], // (taskId, stageId, stateAttempt, metrics)
blockManagerId: BlockManagerId): Boolean = {
listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, taskMetrics))
implicit val timeout = Timeout(600 seconds)
@@ -677,7 +677,10 @@ class DAGScheduler(
}
private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
- listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
+ // Note that there is a chance that this task is launched after the stage is cancelled.
+ // In that case, we wouldn't have the stage anymore in stageIdToStage.
+ val stageAttemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+ listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, taskInfo))
submitWaitingStages()
}
@@ -695,8 +698,8 @@ class DAGScheduler(
// is in the process of getting stopped.
val stageFailedMessage = "Stage cancelled because SparkContext was shut down"
runningStages.foreach { stage =>
- stage.info.stageFailed(stageFailedMessage)
- listenerBus.post(SparkListenerStageCompleted(stage.info))
+ stage.latestInfo.stageFailed(stageFailedMessage)
+ listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
}
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error)))
}
@@ -781,7 +784,16 @@ class DAGScheduler(
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingTasks.clear()
- var tasks = ArrayBuffer[Task[_]]()
+
+ // First figure out the indexes of partition ids to compute.
+ val partitionsToCompute: Seq[Int] = {
+ if (stage.isShuffleMap) {
+ (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
+ } else {
+ val job = stage.resultOfJob.get
+ (0 until job.numPartitions).filter(id => !job.finished(id))
+ }
+ }
val properties = if (jobIdToActiveJob.contains(jobId)) {
jobIdToActiveJob(stage.jobId).properties
@@ -795,7 +807,8 @@ class DAGScheduler(
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
- listenerBus.post(SparkListenerStageSubmitted(stage.info, properties))
+ stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))
+ listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
@@ -826,20 +839,19 @@ class DAGScheduler(
return
}
- if (stage.isShuffleMap) {
- for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
- val locs = getPreferredLocs(stage.rdd, p)
- val part = stage.rdd.partitions(p)
- tasks += new ShuffleMapTask(stage.id, taskBinary, part, locs)
+ val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
+ partitionsToCompute.map { id =>
+ val locs = getPreferredLocs(stage.rdd, id)
+ val part = stage.rdd.partitions(id)
+ new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
} else {
- // This is a final stage; figure out its job's missing partitions
val job = stage.resultOfJob.get
- for (id <- 0 until job.numPartitions if !job.finished(id)) {
+ partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
- tasks += new ResultTask(stage.id, taskBinary, part, locs, id)
+ new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
@@ -869,11 +881,11 @@ class DAGScheduler(
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
- stage.info.submissionTime = Some(clock.getTime())
+ stage.latestInfo.submissionTime = Some(clock.getTime())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should post
// SparkListenerStageCompleted here in case there are no tasks to run.
- listenerBus.post(SparkListenerStageCompleted(stage.info))
+ listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
runningStages -= stage
@@ -892,8 +904,9 @@ class DAGScheduler(
// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
- listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
- event.taskMetrics))
+ val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+ listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,
+ event.taskInfo, event.taskMetrics))
}
if (!stageIdToStage.contains(task.stageId)) {
@@ -902,14 +915,19 @@ class DAGScheduler(
}
val stage = stageIdToStage(task.stageId)
- def markStageAsFinished(stage: Stage) = {
- val serviceTime = stage.info.submissionTime match {
+ def markStageAsFinished(stage: Stage, errorMessage: Option[String] = None) = {
+ val serviceTime = stage.latestInfo.submissionTime match {
case Some(t) => "%.03f".format((clock.getTime() - t) / 1000.0)
case _ => "Unknown"
}
- logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
- stage.info.completionTime = Some(clock.getTime())
- listenerBus.post(SparkListenerStageCompleted(stage.info))
+ if (errorMessage.isEmpty) {
+ logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
+ stage.latestInfo.completionTime = Some(clock.getTime())
+ } else {
+ stage.latestInfo.stageFailed(errorMessage.get)
+ logInfo("%s (%s) failed in %s s".format(stage, stage.name, serviceTime))
+ }
+ listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
runningStages -= stage
}
event.reason match {
@@ -924,7 +942,7 @@ class DAGScheduler(
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
- stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
+ stage.latestInfo.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
@@ -935,8 +953,8 @@ class DAGScheduler(
logError(s"Failed to update accumulators for $task", e)
}
}
- listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
- event.taskMetrics))
+ listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
+ event.reason, event.taskInfo, event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
@@ -1029,6 +1047,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
+ markStageAsFinished(failedStage, Some("Fetch failure"))
runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
@@ -1142,7 +1161,7 @@ class DAGScheduler(
}
val dependentJobs: Seq[ActiveJob] =
activeJobs.filter(job => stageDependsOn(job.finalStage, failedStage)).toSeq
- failedStage.info.completionTime = Some(clock.getTime())
+ failedStage.latestInfo.completionTime = Some(clock.getTime())
for (job <- dependentJobs) {
failJobAndIndependentStages(job, s"Job aborted due to stage failure: $reason")
}
@@ -1182,8 +1201,8 @@ class DAGScheduler(
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
- stage.info.stageFailed(failureReason)
- listenerBus.post(SparkListenerStageCompleted(stage.info))
+ stage.latestInfo.stageFailed(failureReason)
+ listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index d01d318633..86ca8445a1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -39,7 +39,8 @@ case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Propert
case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
+case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)
+ extends SparkListenerEvent
@DeveloperApi
case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
@@ -47,6 +48,7 @@ case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListe
@DeveloperApi
case class SparkListenerTaskEnd(
stageId: Int,
+ stageAttemptId: Int,
taskType: String,
reason: TaskEndReason,
taskInfo: TaskInfo,
@@ -75,10 +77,15 @@ case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
@DeveloperApi
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
+/**
+ * Periodic updates from executors.
+ * @param execId executor id
+ * @param taskMetrics sequence of (task id, stage id, stage attempt, metrics)
+ */
@DeveloperApi
case class SparkListenerExecutorMetricsUpdate(
execId: String,
- taskMetrics: Seq[(Long, Int, TaskMetrics)])
+ taskMetrics: Seq[(Long, Int, Int, TaskMetrics)])
extends SparkListenerEvent
@DeveloperApi
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 800905413d..071568cdfb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -43,6 +43,9 @@ import org.apache.spark.util.CallSite
* stage, the callSite gives the user code that created the RDD being shuffled. For a result
* stage, the callSite gives the user code that executes the associated action (e.g. count()).
*
+ * A single stage can consist of multiple attempts. In that case, the latestInfo field will
+ * be updated for each attempt.
+ *
*/
private[spark] class Stage(
val id: Int,
@@ -71,8 +74,8 @@ private[spark] class Stage(
val name = callSite.shortForm
val details = callSite.longForm
- /** Pointer to the [StageInfo] object, set by DAGScheduler. */
- var info: StageInfo = StageInfo.fromStage(this)
+ /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
+ var latestInfo: StageInfo = StageInfo.fromStage(this)
def isAvailable: Boolean = {
if (!isShuffleMap) {
@@ -116,6 +119,7 @@ private[spark] class Stage(
}
}
+ /** Return a new attempt id, starting with 0. */
def newAttemptId(): Int = {
val id = nextAttemptId
nextAttemptId += 1
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 2a407e47a0..c6dc3369ba 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -29,6 +29,7 @@ import org.apache.spark.storage.RDDInfo
@DeveloperApi
class StageInfo(
val stageId: Int,
+ val attemptId: Int,
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
@@ -56,9 +57,15 @@ private[spark] object StageInfo {
* shuffle dependencies. Therefore, all ancestor RDDs related to this Stage's RDD through a
* sequence of narrow dependencies should also be associated with this Stage.
*/
- def fromStage(stage: Stage): StageInfo = {
+ def fromStage(stage: Stage, numTasks: Option[Int] = None): StageInfo = {
val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
- new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
+ new StageInfo(
+ stage.id,
+ stage.attemptId,
+ stage.name,
+ numTasks.getOrElse(stage.numTasks),
+ rddInfos,
+ stage.details)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 6c0d1b2752..ad051e59af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -333,12 +333,12 @@ private[spark] class TaskSchedulerImpl(
execId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId): Boolean = {
- val metricsWithStageIds = taskMetrics.flatMap {
- case (id, metrics) => {
+
+ val metricsWithStageIds: Array[(Long, Int, Int, TaskMetrics)] = synchronized {
+ taskMetrics.flatMap { case (id, metrics) =>
taskIdToTaskSetId.get(id)
.flatMap(activeTaskSets.get)
- .map(_.stageId)
- .map(x => (id, x, metrics))
+ .map(taskSetMgr => (id, taskSetMgr.stageId, taskSetMgr.taskSet.attempt, metrics))
}
}
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 613fa7850b..c3ad325156 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -31,9 +31,5 @@ private[spark] class TaskSet(
val properties: Properties) {
val id: String = stageId + "." + attempt
- def kill(interruptThread: Boolean) {
- tasks.foreach(_.kill(interruptThread))
- }
-
override def toString: String = "TaskSet " + id
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 0cc51c8737..2987dc0449 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -24,8 +24,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.ui.jobs.UIData.StageUIData
import org.apache.spark.util.Utils
-/** Page showing executor summary */
-private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
+/** Stage summary grouped by executors. */
+private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
private val listener = parent.listener
def toNodeSeq: Seq[Node] = {
@@ -65,7 +65,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
executorIdToAddress.put(executorId, address)
}
- listener.stageIdToData.get(stageId) match {
+ listener.stageIdToData.get((stageId, stageAttemptId)) match {
case Some(stageData: StageUIData) =>
stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
<tr>
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 74cd637d88..f7f918fd52 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -43,12 +43,16 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// How many stages to remember
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
- val activeStages = HashMap[Int, StageInfo]()
+ // Map from stageId to StageInfo
+ val activeStages = new HashMap[Int, StageInfo]
+
+ // Map from (stageId, attemptId) to StageUIData
+ val stageIdToData = new HashMap[(Int, Int), StageUIData]
+
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
- val stageIdToData = new HashMap[Int, StageUIData]
-
+ // Map from pool name to a hash map (map from stage id to StageInfo).
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
@@ -59,9 +63,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
- val stageId = stage.stageId
- val stageData = stageIdToData.getOrElseUpdate(stageId, {
- logWarning("Stage completed for unknown stage " + stageId)
+ val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
+ logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
})
@@ -69,8 +72,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageData.accumulables(id) = info
}
- poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
- activeStages.remove(stageId)
+ poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap =>
+ hashMap.remove(stage.stageId)
+ }
+ activeStages.remove(stage.stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
trimIfNecessary(completedStages)
@@ -84,7 +89,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
- stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
+ stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) }
stages.trimStart(toRemove)
}
}
@@ -98,21 +103,21 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
- val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
+ val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName
stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
- val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
+ val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
stages(stage.stageId) = stage
}
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
- val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
+ val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
@@ -128,8 +133,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
val info = taskEnd.taskInfo
- if (info != null) {
- val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
+ // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
+ // compeletion event is for. Let's just drop it here. This means we might have some speculation
+ // tasks on the web ui that's never marked as complete.
+ if (info != null && taskEnd.stageAttemptId != -1) {
+ val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})
@@ -222,8 +230,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
- for ((taskId, sid, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
- val stageData = stageIdToData.getOrElseUpdate(sid, {
+ for ((taskId, sid, sAttempt, taskMetrics) <- executorMetricsUpdate.taskMetrics) {
+ val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
logWarning("Metrics update for task in unknown stage " + sid)
new StageUIData
})
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index d4eb02722a..db01be596e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -34,7 +34,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt
- val stageDataOption = listener.stageIdToData.get(stageId)
+ val stageAttemptId = request.getParameter("attempt").toInt
+ val stageDataOption = listener.stageIdToData.get((stageId, stageAttemptId))
if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
val content =
@@ -42,14 +43,15 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
<h4>Summary Metrics</h4> No tasks have started yet
<h4>Tasks</h4> No tasks have started yet
</div>
- return UIUtils.headerSparkPage("Details for Stage %s".format(stageId), content, parent)
+ return UIUtils.headerSparkPage(
+ s"Details for Stage $stageId (Attempt $stageAttemptId)", content, parent)
}
val stageData = stageDataOption.get
val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
val numCompleted = tasks.count(_.taskInfo.finished)
- val accumulables = listener.stageIdToData(stageId).accumulables
+ val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables
val hasInput = stageData.inputBytes > 0
val hasShuffleRead = stageData.shuffleReadBytes > 0
val hasShuffleWrite = stageData.shuffleWriteBytes > 0
@@ -211,7 +213,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
def quantileRow(data: Seq[Node]): Seq[Node] = <tr>{data}</tr>
Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
- val executorTable = new ExecutorTable(stageId, parent)
+
+ val executorTable = new ExecutorTable(stageId, stageAttemptId, parent)
val maybeAccumulableTable: Seq[Node] =
if (accumulables.size > 0) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 16ad0df45a..2e67310594 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -97,8 +97,8 @@ private[ui] class StageTableBase(
}
// scalastyle:on
- val nameLinkUri ="%s/stages/stage?id=%s"
- .format(UIUtils.prependBaseUri(parent.basePath), s.stageId)
+ val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s"
+ .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId)
val nameLink = <a href={nameLinkUri}>{s.name}</a>
val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
@@ -121,7 +121,7 @@ private[ui] class StageTableBase(
}
val stageDesc = for {
- stageData <- listener.stageIdToData.get(s.stageId)
+ stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
desc <- stageData.description
} yield {
<div><em>{desc}</em></div>
@@ -131,7 +131,7 @@ private[ui] class StageTableBase(
}
protected def stageRow(s: StageInfo): Seq[Node] = {
- val stageDataOption = listener.stageIdToData.get(s.stageId)
+ val stageDataOption = listener.stageIdToData.get((s.stageId, s.attemptId))
if (stageDataOption.isEmpty) {
return <td>{s.stageId}</td><td>No data available for this stage</td>
}
@@ -154,7 +154,11 @@ private[ui] class StageTableBase(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
- <td>{s.stageId}</td> ++
+ {if (s.attemptId > 0) {
+ <td>{s.stageId} (retry {s.attemptId})</td>
+ } else {
+ <td>{s.stageId}</td>
+ }} ++
{if (isFairScheduler) {
<td>
<a href={"%s/stages/pool?poolname=%s"
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1e18ec688c..db7384705f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -96,6 +96,7 @@ private[spark] object JsonProtocol {
val taskInfo = taskStart.taskInfo
("Event" -> Utils.getFormattedClassName(taskStart)) ~
("Stage ID" -> taskStart.stageId) ~
+ ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
("Task Info" -> taskInfoToJson(taskInfo))
}
@@ -112,6 +113,7 @@ private[spark] object JsonProtocol {
val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
("Event" -> Utils.getFormattedClassName(taskEnd)) ~
("Stage ID" -> taskEnd.stageId) ~
+ ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
("Task Type" -> taskEnd.taskType) ~
("Task End Reason" -> taskEndReason) ~
("Task Info" -> taskInfoToJson(taskInfo)) ~
@@ -187,6 +189,7 @@ private[spark] object JsonProtocol {
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
("Stage ID" -> stageInfo.stageId) ~
+ ("Stage Attempt ID" -> stageInfo.attemptId) ~
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
@@ -419,8 +422,9 @@ private[spark] object JsonProtocol {
def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
val stageId = (json \ "Stage ID").extract[Int]
+ val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
val taskInfo = taskInfoFromJson(json \ "Task Info")
- SparkListenerTaskStart(stageId, taskInfo)
+ SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
}
def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = {
@@ -430,11 +434,12 @@ private[spark] object JsonProtocol {
def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
val stageId = (json \ "Stage ID").extract[Int]
+ val stageAttemptId = (json \ "Stage Attempt ID").extractOpt[Int].getOrElse(0)
val taskType = (json \ "Task Type").extract[String]
val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
val taskInfo = taskInfoFromJson(json \ "Task Info")
val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
- SparkListenerTaskEnd(stageId, taskType, taskEndReason, taskInfo, taskMetrics)
+ SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo, taskMetrics)
}
def jobStartFromJson(json: JValue): SparkListenerJobStart = {
@@ -492,6 +497,7 @@ private[spark] object JsonProtocol {
def stageInfoFromJson(json: JValue): StageInfo = {
val stageId = (json \ "Stage ID").extract[Int]
+ val attemptId = (json \ "Attempt ID").extractOpt[Int].getOrElse(0)
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson(_))
@@ -504,7 +510,7 @@ private[spark] object JsonProtocol {
case None => Seq[AccumulableInfo]()
}
- val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
+ val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 51fb646a3c..7671cb969a 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -69,10 +69,10 @@ class StorageStatusListenerSuite extends FunSuite {
// Task end with no updated blocks
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics))
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
}
@@ -92,13 +92,13 @@ class StorageStatusListenerSuite extends FunSuite {
// Task end with new blocks
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").numBlocks === 2)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
@@ -111,13 +111,14 @@ class StorageStatusListenerSuite extends FunSuite {
val droppedBlock3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.NONE, 0L, 0L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(droppedBlock1, droppedBlock3))
taskMetrics2.updatedBlocks = Some(Seq(droppedBlock2, droppedBlock3))
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
+
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 1)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
assert(listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 2)))
assert(listener.executorIdToStorageStatus("fat").containsBlock(RDDBlockId(4, 0)))
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo2, taskMetrics2))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo2, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").numBlocks === 1)
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
assert(!listener.executorIdToStorageStatus("big").containsBlock(RDDBlockId(1, 1)))
@@ -135,8 +136,8 @@ class StorageStatusListenerSuite extends FunSuite {
val block3 = (RDDBlockId(4, 0), BlockStatus(StorageLevel.DISK_ONLY, 0L, 300L, 0L))
taskMetrics1.updatedBlocks = Some(Seq(block1, block2))
taskMetrics2.updatedBlocks = Some(Seq(block3))
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics1))
- listener.onTaskEnd(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo1, taskMetrics2))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics1))
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo1, taskMetrics2))
assert(listener.executorIdToStorageStatus("big").numBlocks === 3)
// Unpersist RDD
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 147ec0bc52..3370dd4156 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val listener = new JobProgressListener(conf)
def createStageStartEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
SparkListenerStageSubmitted(stageInfo)
}
def createStageEndEvent(stageId: Int) = {
- val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
+ val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
SparkListenerStageCompleted(stageInfo)
}
@@ -70,33 +70,37 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0)
val taskType = Utils.getFormattedClassName(task)
- listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
- .shuffleRead === 1000)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToData.getOrElse((0, 0), fail())
+ .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo =
new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0)
- listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0)
- listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
- .shuffleRead === 2000)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToData.getOrElse((0, 0), fail())
+ .executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000)
// finish this task, should get updated duration
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0)
- listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
- assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
- .shuffleRead === 1000)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ assert(listener.stageIdToData.getOrElse((0, 0), fail())
+ .executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000)
}
test("test task success vs failure counting for different task end reasons") {
@@ -119,16 +123,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
UnknownReason)
var failCount = 0
for (reason <- taskFailedReasons) {
- listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, metrics))
failCount += 1
- assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
- assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
+ assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0)
+ assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
}
// Make sure we count success as success.
- listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
- assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
- assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics))
+ assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1)
+ assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
}
test("test update metrics") {
@@ -163,18 +169,18 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
taskInfo
}
- listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1234L)))
- listener.onTaskStart(SparkListenerTaskStart(0, makeTaskInfo(1235L)))
- listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1236L)))
- listener.onTaskStart(SparkListenerTaskStart(1, makeTaskInfo(1237L)))
+ listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L)))
+ listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L)))
+ listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
+ listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
- (1234L, 0, makeTaskMetrics(0)),
- (1235L, 0, makeTaskMetrics(100)),
- (1236L, 1, makeTaskMetrics(200)))))
+ (1234L, 0, 0, makeTaskMetrics(0)),
+ (1235L, 0, 0, makeTaskMetrics(100)),
+ (1236L, 1, 0, makeTaskMetrics(200)))))
- var stage0Data = listener.stageIdToData.get(0).get
- var stage1Data = listener.stageIdToData.get(1).get
+ var stage0Data = listener.stageIdToData.get((0, 0)).get
+ var stage1Data = listener.stageIdToData.get((1, 0)).get
assert(stage0Data.shuffleReadBytes == 102)
assert(stage1Data.shuffleReadBytes == 201)
assert(stage0Data.shuffleWriteBytes == 106)
@@ -195,14 +201,14 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
.totalBlocksFetched == 202)
// task that was included in a heartbeat
- listener.onTaskEnd(SparkListenerTaskEnd(0, taskType, Success, makeTaskInfo(1234L, 1),
+ listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
makeTaskMetrics(300)))
// task that wasn't included in a heartbeat
- listener.onTaskEnd(SparkListenerTaskEnd(1, taskType, Success, makeTaskInfo(1237L, 1),
+ listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1),
makeTaskMetrics(400)))
- stage0Data = listener.stageIdToData.get(0).get
- stage1Data = listener.stageIdToData.get(1).get
+ stage0Data = listener.stageIdToData.get((0, 0)).get
+ stage1Data = listener.stageIdToData.get((1, 0)).get
assert(stage0Data.shuffleReadBytes == 402)
assert(stage1Data.shuffleReadBytes == 602)
assert(stage0Data.shuffleWriteBytes == 406)
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 6e68dcb342..b860177705 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -53,7 +53,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.isEmpty)
// 2 RDDs are known, but none are cached
- val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
+ val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 2)
assert(storageListener.rddInfoList.isEmpty)
@@ -63,7 +63,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val rddInfo3Cached = rddInfo3
rddInfo2Cached.numCachedPartitions = 1
rddInfo3Cached.numCachedPartitions = 1
- val stageInfo1 = new StageInfo(1, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
+ val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
assert(storageListener._rddInfoMap.size === 4)
assert(storageListener.rddInfoList.size === 2)
@@ -71,7 +71,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
// Submitting RDDInfos with duplicate IDs does nothing
val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY)
rddInfo0Cached.numCachedPartitions = 1
- val stageInfo0Cached = new StageInfo(0, "0", 100, Seq(rddInfo0), "details")
+ val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
assert(storageListener._rddInfoMap.size === 4)
assert(storageListener.rddInfoList.size === 2)
@@ -87,7 +87,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val rddInfo1Cached = rddInfo1
rddInfo0Cached.numCachedPartitions = 1
rddInfo1Cached.numCachedPartitions = 1
- val stageInfo0 = new StageInfo(0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
+ val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 2)
assert(storageListener.rddInfoList.size === 2)
@@ -106,7 +106,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo0 = rddInfo0
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
- val stageInfo0 = new StageInfo(0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
+ val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
@@ -116,7 +116,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
assert(!storageListener._rddInfoMap(2).isCached)
// Task end with no updated blocks. This should not change anything.
- bus.postToAll(SparkListenerTaskEnd(0, "obliteration", Success, taskInfo, new TaskMetrics))
+ bus.postToAll(SparkListenerTaskEnd(0, 0, "obliteration", Success, taskInfo, new TaskMetrics))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0)
@@ -128,7 +128,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
(RDDBlockId(0, 102), BlockStatus(memAndDisk, 400L, 0L, 200L)),
(RDDBlockId(1, 20), BlockStatus(memAndDisk, 0L, 240L, 0L))
))
- bus.postToAll(SparkListenerTaskEnd(1, "obliteration", Success, taskInfo, metrics1))
+ bus.postToAll(SparkListenerTaskEnd(1, 0, "obliteration", Success, taskInfo, metrics1))
assert(storageListener._rddInfoMap(0).memSize === 800L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
@@ -150,7 +150,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
(RDDBlockId(2, 40), BlockStatus(none, 0L, 0L, 0L)), // doesn't actually exist
(RDDBlockId(4, 80), BlockStatus(none, 0L, 0L, 0L)) // doesn't actually exist
))
- bus.postToAll(SparkListenerTaskEnd(2, "obliteration", Success, taskInfo, metrics2))
+ bus.postToAll(SparkListenerTaskEnd(2, 0, "obliteration", Success, taskInfo, metrics2))
assert(storageListener._rddInfoMap(0).memSize === 400L)
assert(storageListener._rddInfoMap(0).diskSize === 400L)
assert(storageListener._rddInfoMap(0).tachyonSize === 200L)
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 97ffb07662..2fd3b9cfd2 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -35,13 +35,13 @@ class JsonProtocolSuite extends FunSuite {
val stageSubmitted =
SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
- val taskStart = SparkListenerTaskStart(111, makeTaskInfo(222L, 333, 1, 444L, false))
+ val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 444L, false))
val taskGettingResult =
SparkListenerTaskGettingResult(makeTaskInfo(1000L, 2000, 5, 3000L, true))
- val taskEnd = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+ val taskEnd = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
makeTaskInfo(123L, 234, 67, 345L, false),
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = false))
- val taskEndWithHadoopInput = SparkListenerTaskEnd(1, "ShuffleMapTask", Success,
+ val taskEndWithHadoopInput = SparkListenerTaskEnd(1, 0, "ShuffleMapTask", Success,
makeTaskInfo(123L, 234, 67, 345L, false),
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true))
val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
@@ -397,7 +397,8 @@ class JsonProtocolSuite extends FunSuite {
private def assertJsonStringEquals(json1: String, json2: String) {
val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
- assert(formatJsonString(json1) === formatJsonString(json2))
+ assert(formatJsonString(json1) === formatJsonString(json2),
+ s"input ${formatJsonString(json1)} got ${formatJsonString(json2)}")
}
private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, T) => Unit) {
@@ -485,7 +486,7 @@ class JsonProtocolSuite extends FunSuite {
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
- val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
+ val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details")
val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
stageInfo.accumulables(acc1.id) = acc1
stageInfo.accumulables(acc2.id) = acc2
@@ -558,84 +559,246 @@ class JsonProtocolSuite extends FunSuite {
private val stageSubmittedJsonString =
"""
- {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
- "greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
- "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
- {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]},"Properties":
- {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+ |{
+ | "Event": "SparkListenerStageSubmitted",
+ | "Stage Info": {
+ | "Stage ID": 100,
+ | "Stage Attempt ID": 0,
+ | "Stage Name": "greetings",
+ | "Number of Tasks": 200,
+ | "RDD Info": [],
+ | "Details": "details",
+ | "Accumulables": [
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | }
+ | ]
+ | },
+ | "Properties": {
+ | "France": "Paris",
+ | "Germany": "Berlin",
+ | "Russia": "Moscow",
+ | "Ukraine": "Kiev"
+ | }
+ |}
"""
private val stageCompletedJsonString =
"""
- {"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":101,"Stage Name":
- "greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
- Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
- "Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
- "Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
- "Accumulables":[{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
- {"ID":1,"Name":"Accumulable1","Update":"delta1","Value":"val1"}]}}
+ |{
+ | "Event": "SparkListenerStageCompleted",
+ | "Stage Info": {
+ | "Stage ID": 101,
+ | "Stage Attempt ID": 0,
+ | "Stage Name": "greetings",
+ | "Number of Tasks": 201,
+ | "RDD Info": [
+ | {
+ | "RDD ID": 101,
+ | "Name": "mayor",
+ | "Storage Level": {
+ | "Use Disk": true,
+ | "Use Memory": true,
+ | "Use Tachyon": false,
+ | "Deserialized": true,
+ | "Replication": 1
+ | },
+ | "Number of Partitions": 201,
+ | "Number of Cached Partitions": 301,
+ | "Memory Size": 401,
+ | "Tachyon Size": 0,
+ | "Disk Size": 501
+ | }
+ | ],
+ | "Details": "details",
+ | "Accumulables": [
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | }
+ | ]
+ | }
+ |}
"""
private val taskStartJsonString =
"""
- |{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
- |"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
- |"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
- |"Failed":false,"Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
- |"Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
- |{"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]}}
+ |{
+ | "Event": "SparkListenerTaskStart",
+ | "Stage ID": 111,
+ | "Stage Attempt ID": 0,
+ | "Task Info": {
+ | "Task ID": 222,
+ | "Index": 333,
+ | "Attempt": 1,
+ | "Launch Time": 444,
+ | "Executor ID": "executor",
+ | "Host": "your kind sir",
+ | "Locality": "NODE_LOCAL",
+ | "Speculative": false,
+ | "Getting Result Time": 0,
+ | "Finish Time": 0,
+ | "Failed": false,
+ | "Accumulables": [
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "Accumulable3",
+ | "Update": "delta3",
+ | "Value": "val3"
+ | }
+ | ]
+ | }
+ |}
""".stripMargin
private val taskGettingResultJsonString =
"""
- |{"Event":"SparkListenerTaskGettingResult","Task Info":
- | {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
- | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
- | "Finish Time":0,"Failed":false,
- | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
- | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
- | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
+ |{
+ | "Event": "SparkListenerTaskGettingResult",
+ | "Task Info": {
+ | "Task ID": 1000,
+ | "Index": 2000,
+ | "Attempt": 5,
+ | "Launch Time": 3000,
+ | "Executor ID": "executor",
+ | "Host": "your kind sir",
+ | "Locality": "NODE_LOCAL",
+ | "Speculative": true,
+ | "Getting Result Time": 0,
+ | "Finish Time": 0,
+ | "Failed": false,
+ | "Accumulables": [
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "Accumulable3",
+ | "Update": "delta3",
+ | "Value": "val3"
+ | }
+ | ]
| }
|}
""".stripMargin
private val taskEndJsonString =
"""
- |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
- |"Task End Reason":{"Reason":"Success"},
- |"Task Info":{
- | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
- | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
- | "Getting Result Time":0,"Finish Time":0,"Failed":false,
- | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
- | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
- | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
- |},
- |"Task Metrics":{
- | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
- | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
- | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
- | "Shuffle Read Metrics":{
- | "Shuffle Finish Time":900,
- | "Remote Blocks Fetched":800,
- | "Local Blocks Fetched":700,
- | "Fetch Wait Time":900,
- | "Remote Bytes Read":1000
+ |{
+ | "Event": "SparkListenerTaskEnd",
+ | "Stage ID": 1,
+ | "Stage Attempt ID": 0,
+ | "Task Type": "ShuffleMapTask",
+ | "Task End Reason": {
+ | "Reason": "Success"
| },
- | "Shuffle Write Metrics":{
- | "Shuffle Bytes Written":1200,
- | "Shuffle Write Time":1500
+ | "Task Info": {
+ | "Task ID": 123,
+ | "Index": 234,
+ | "Attempt": 67,
+ | "Launch Time": 345,
+ | "Executor ID": "executor",
+ | "Host": "your kind sir",
+ | "Locality": "NODE_LOCAL",
+ | "Speculative": false,
+ | "Getting Result Time": 0,
+ | "Finish Time": 0,
+ | "Failed": false,
+ | "Accumulables": [
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "Accumulable3",
+ | "Update": "delta3",
+ | "Value": "val3"
+ | }
+ | ]
| },
- | "Updated Blocks":[
- | {"Block ID":"rdd_0_0",
- | "Status":{
- | "Storage Level":{
- | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
- | "Replication":2
- | },
- | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ | "Task Metrics": {
+ | "Host Name": "localhost",
+ | "Executor Deserialize Time": 300,
+ | "Executor Run Time": 400,
+ | "Result Size": 500,
+ | "JVM GC Time": 600,
+ | "Result Serialization Time": 700,
+ | "Memory Bytes Spilled": 800,
+ | "Disk Bytes Spilled": 0,
+ | "Shuffle Read Metrics": {
+ | "Shuffle Finish Time": 900,
+ | "Remote Blocks Fetched": 800,
+ | "Local Blocks Fetched": 700,
+ | "Fetch Wait Time": 900,
+ | "Remote Bytes Read": 1000
+ | },
+ | "Shuffle Write Metrics": {
+ | "Shuffle Bytes Written": 1200,
+ | "Shuffle Write Time": 1500
+ | },
+ | "Updated Blocks": [
+ | {
+ | "Block ID": "rdd_0_0",
+ | "Status": {
+ | "Storage Level": {
+ | "Use Disk": true,
+ | "Use Memory": true,
+ | "Use Tachyon": false,
+ | "Deserialized": false,
+ | "Replication": 2
+ | },
+ | "Memory Size": 0,
+ | "Tachyon Size": 0,
+ | "Disk Size": 0
+ | }
| }
- | }
| ]
| }
|}
@@ -643,80 +806,187 @@ class JsonProtocolSuite extends FunSuite {
private val taskEndWithHadoopInputJsonString =
"""
- |{"Event":"SparkListenerTaskEnd","Stage ID":1,"Task Type":"ShuffleMapTask",
- |"Task End Reason":{"Reason":"Success"},
- |"Task Info":{
- | "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
- | "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
- | "Getting Result Time":0,"Finish Time":0,"Failed":false,
- | "Accumulables":[{"ID":1,"Name":"Accumulable1","Update":"delta1",
- | "Value":"val1"},{"ID":2,"Name":"Accumulable2","Update":"delta2","Value":"val2"},
- | {"ID":3,"Name":"Accumulable3","Update":"delta3","Value":"val3"}]
- |},
- |"Task Metrics":{
- | "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
- | "Result Size":500,"JVM GC Time":600,"Result Serialization Time":700,
- | "Memory Bytes Spilled":800,"Disk Bytes Spilled":0,
- | "Shuffle Write Metrics":{"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},
- | "Input Metrics":{"Data Read Method":"Hadoop","Bytes Read":2100},
- | "Updated Blocks":[
- | {"Block ID":"rdd_0_0",
- | "Status":{
- | "Storage Level":{
- | "Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false,
- | "Replication":2
- | },
- | "Memory Size":0,"Tachyon Size":0,"Disk Size":0
+ |{
+ | "Event": "SparkListenerTaskEnd",
+ | "Stage ID": 1,
+ | "Stage Attempt ID": 0,
+ | "Task Type": "ShuffleMapTask",
+ | "Task End Reason": {
+ | "Reason": "Success"
+ | },
+ | "Task Info": {
+ | "Task ID": 123,
+ | "Index": 234,
+ | "Attempt": 67,
+ | "Launch Time": 345,
+ | "Executor ID": "executor",
+ | "Host": "your kind sir",
+ | "Locality": "NODE_LOCAL",
+ | "Speculative": false,
+ | "Getting Result Time": 0,
+ | "Finish Time": 0,
+ | "Failed": false,
+ | "Accumulables": [
+ | {
+ | "ID": 1,
+ | "Name": "Accumulable1",
+ | "Update": "delta1",
+ | "Value": "val1"
+ | },
+ | {
+ | "ID": 2,
+ | "Name": "Accumulable2",
+ | "Update": "delta2",
+ | "Value": "val2"
+ | },
+ | {
+ | "ID": 3,
+ | "Name": "Accumulable3",
+ | "Update": "delta3",
+ | "Value": "val3"
| }
- | }
- | ]}
+ | ]
+ | },
+ | "Task Metrics": {
+ | "Host Name": "localhost",
+ | "Executor Deserialize Time": 300,
+ | "Executor Run Time": 400,
+ | "Result Size": 500,
+ | "JVM GC Time": 600,
+ | "Result Serialization Time": 700,
+ | "Memory Bytes Spilled": 800,
+ | "Disk Bytes Spilled": 0,
+ | "Shuffle Write Metrics": {
+ | "Shuffle Bytes Written": 1200,
+ | "Shuffle Write Time": 1500
+ | },
+ | "Input Metrics": {
+ | "Data Read Method": "Hadoop",
+ | "Bytes Read": 2100
+ | },
+ | "Updated Blocks": [
+ | {
+ | "Block ID": "rdd_0_0",
+ | "Status": {
+ | "Storage Level": {
+ | "Use Disk": true,
+ | "Use Memory": true,
+ | "Use Tachyon": false,
+ | "Deserialized": false,
+ | "Replication": 2
+ | },
+ | "Memory Size": 0,
+ | "Tachyon Size": 0,
+ | "Disk Size": 0
+ | }
+ | }
+ | ]
+ | }
|}
"""
private val jobStartJsonString =
"""
- {"Event":"SparkListenerJobStart","Job ID":10,"Stage IDs":[1,2,3,4],"Properties":
- {"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
+ |{
+ | "Event": "SparkListenerJobStart",
+ | "Job ID": 10,
+ | "Stage IDs": [
+ | 1,
+ | 2,
+ | 3,
+ | 4
+ | ],
+ | "Properties": {
+ | "France": "Paris",
+ | "Germany": "Berlin",
+ | "Russia": "Moscow",
+ | "Ukraine": "Kiev"
+ | }
+ |}
"""
private val jobEndJsonString =
"""
- {"Event":"SparkListenerJobEnd","Job ID":20,"Job Result":{"Result":"JobSucceeded"}}
+ |{
+ | "Event": "SparkListenerJobEnd",
+ | "Job ID": 20,
+ | "Job Result": {
+ | "Result": "JobSucceeded"
+ | }
+ |}
"""
private val environmentUpdateJsonString =
"""
- {"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"GC speed":"9999 objects/s",
- "Java home":"Land of coffee"},"Spark Properties":{"Job throughput":"80000 jobs/s,
- regardless of job type"},"System Properties":{"Username":"guest","Password":"guest"},
- "Classpath Entries":{"Super library":"/tmp/super_library"}}
+ |{
+ | "Event": "SparkListenerEnvironmentUpdate",
+ | "JVM Information": {
+ | "GC speed": "9999 objects/s",
+ | "Java home": "Land of coffee"
+ | },
+ | "Spark Properties": {
+ | "Job throughput": "80000 jobs/s, regardless of job type"
+ | },
+ | "System Properties": {
+ | "Username": "guest",
+ | "Password": "guest"
+ | },
+ | "Classpath Entries": {
+ | "Super library": "/tmp/super_library"
+ | }
+ |}
"""
private val blockManagerAddedJsonString =
"""
- {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"Stars",
- "Host":"In your multitude...","Port":300,"Netty Port":400},"Maximum Memory":500}
+ |{
+ | "Event": "SparkListenerBlockManagerAdded",
+ | "Block Manager ID": {
+ | "Executor ID": "Stars",
+ | "Host": "In your multitude...",
+ | "Port": 300,
+ | "Netty Port": 400
+ | },
+ | "Maximum Memory": 500
+ |}
"""
private val blockManagerRemovedJsonString =
"""
- {"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"Scarce",
- "Host":"to be counted...","Port":100,"Netty Port":200}}
+ |{
+ | "Event": "SparkListenerBlockManagerRemoved",
+ | "Block Manager ID": {
+ | "Executor ID": "Scarce",
+ | "Host": "to be counted...",
+ | "Port": 100,
+ | "Netty Port": 200
+ | }
+ |}
"""
private val unpersistRDDJsonString =
"""
- {"Event":"SparkListenerUnpersistRDD","RDD ID":12345}
+ |{
+ | "Event": "SparkListenerUnpersistRDD",
+ | "RDD ID": 12345
+ |}
"""
private val applicationStartJsonString =
"""
- {"Event":"SparkListenerApplicationStart","App Name":"The winner of all","Timestamp":42,
- "User":"Garfield"}
+ |{
+ | "Event": "SparkListenerApplicationStart",
+ | "App Name": "The winner of all",
+ | "Timestamp": 42,
+ | "User": "Garfield"
+ |}
"""
private val applicationEndJsonString =
"""
- {"Event":"SparkListenerApplicationEnd","Timestamp":42}
+ |{
+ | "Event": "SparkListenerApplicationEnd",
+ | "Timestamp": 42
+ |}
"""
}