aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorXin Ren <iamshrek@126.com>2015-12-08 11:44:51 -0600
committerImran Rashid <irashid@cloudera.com>2015-12-08 11:46:46 -0600
commit6cb06e8711fd6ac10c57faeb94bc323cae1cef27 (patch)
tree2eb7479b50302d93e1532e26f7892f5c2974df7b /core
parente3735ce1602826f0a8e0ca9e08730923843449ee (diff)
downloadspark-6cb06e8711fd6ac10c57faeb94bc323cae1cef27.tar.gz
spark-6cb06e8711fd6ac10c57faeb94bc323cae1cef27.tar.bz2
spark-6cb06e8711fd6ac10c57faeb94bc323cae1cef27.zip
[SPARK-11155][WEB UI] Stage summary json should include stage duration
The json endpoint for stages doesn't include information on the stage duration that is present in the UI. This looks like a simple oversight, they should be included. eg., the metrics should be included at api/v1/applications/<appId>/stages. Metrics I've added are: submissionTime, firstTaskLaunchedTime and completionTime Author: Xin Ren <iamshrek@126.com> Closes #10107 from keypointt/SPARK-11155.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/status/api/v1/api.scala3
-rw-r--r--core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json11
-rw-r--r--core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json5
-rw-r--r--core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json5
-rw-r--r--core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json5
-rw-r--r--core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json14
-rw-r--r--core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json5
-rw-r--r--core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json5
-rw-r--r--core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala62
10 files changed, 121 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 24a0b52206..31b4dd7c0f 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -17,8 +17,8 @@
package org.apache.spark.status.api.v1
import java.util.{Arrays, Date, List => JList}
-import javax.ws.rs.{GET, PathParam, Produces, QueryParam}
import javax.ws.rs.core.MediaType
+import javax.ws.rs.{GET, Produces, QueryParam}
import org.apache.spark.executor.{InputMetrics => InternalInputMetrics, OutputMetrics => InternalOutputMetrics, ShuffleReadMetrics => InternalShuffleReadMetrics, ShuffleWriteMetrics => InternalShuffleWriteMetrics, TaskMetrics => InternalTaskMetrics}
import org.apache.spark.scheduler.{AccumulableInfo => InternalAccumulableInfo, StageInfo}
@@ -59,6 +59,15 @@ private[v1] object AllStagesResource {
stageUiData: StageUIData,
includeDetails: Boolean): StageData = {
+ val taskLaunchTimes = stageUiData.taskData.values.map(_.taskInfo.launchTime).filter(_ > 0)
+
+ val firstTaskLaunchedTime: Option[Date] =
+ if (taskLaunchTimes.nonEmpty) {
+ Some(new Date(taskLaunchTimes.min))
+ } else {
+ None
+ }
+
val taskData = if (includeDetails) {
Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
} else {
@@ -92,6 +101,9 @@ private[v1] object AllStagesResource {
numCompleteTasks = stageUiData.numCompleteTasks,
numFailedTasks = stageUiData.numFailedTasks,
executorRunTime = stageUiData.executorRunTime,
+ submissionTime = stageInfo.submissionTime.map(new Date(_)),
+ firstTaskLaunchedTime,
+ completionTime = stageInfo.completionTime.map(new Date(_)),
inputBytes = stageUiData.inputBytes,
inputRecords = stageUiData.inputRecords,
outputBytes = stageUiData.outputBytes,
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index baddfc50c1..5feb1dc2e5 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -120,6 +120,9 @@ class StageData private[spark](
val numFailedTasks: Int,
val executorRunTime: Long,
+ val submissionTime: Option[Date],
+ val firstTaskLaunchedTime: Option[Date],
+ val completionTime: Option[Date],
val inputBytes: Long,
val inputRecords: Long,
diff --git a/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
index 31ac9beea8..8f8067f86d 100644
--- a/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/complete_stage_list_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 162,
+ "submissionTime" : "2015-02-03T16:43:07.191GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:07.191GMT",
+ "completionTime" : "2015-02-03T16:43:07.226GMT",
"inputBytes" : 160,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -28,6 +31,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
+ "submissionTime" : "2015-02-03T16:43:05.829GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
+ "completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -50,6 +56,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 4338,
+ "submissionTime" : "2015-02-03T16:43:04.228GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:04.234GMT",
+ "completionTime" : "2015-02-03T16:43:04.819GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -64,4 +73,4 @@
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line9.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:15)\n$line9.$read$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line9.$read$$iwC$$iwC.<init>(<console>:22)\n$line9.$read$$iwC.<init>(<console>:24)\n$line9.$read.<init>(<console>:26)\n$line9.$read$.<init>(<console>:30)\n$line9.$read$.<clinit>(<console>)\n$line9.$eval$.<init>(<console>:7)\n$line9.$eval$.<clinit>(<console>)\n$line9.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
-} ] \ No newline at end of file
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
index bff6a4f69d..08b692eda8 100644
--- a/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/failed_stage_list_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 7,
"numFailedTasks" : 1,
"executorRunTime" : 278,
+ "submissionTime" : "2015-02-03T16:43:06.296GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:06.296GMT",
+ "completionTime" : "2015-02-03T16:43:06.347GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -20,4 +23,4 @@
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
-} ] \ No newline at end of file
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
index 111cb8163e..b07011d4f1 100644
--- a/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/one_stage_attempt_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
+ "submissionTime" : "2015-02-03T16:43:05.829GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
+ "completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -267,4 +270,4 @@
"diskBytesSpilled" : 0
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
index ef339f89af..2f71520549 100644
--- a/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/one_stage_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
+ "submissionTime" : "2015-02-03T16:43:05.829GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
+ "completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -267,4 +270,4 @@
"diskBytesSpilled" : 0
}
}
-} ] \ No newline at end of file
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
index 056fac7088..5b957ed549 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_list_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 162,
+ "submissionTime" : "2015-02-03T16:43:07.191GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:07.191GMT",
+ "completionTime" : "2015-02-03T16:43:07.226GMT",
"inputBytes" : 160,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -28,6 +31,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 3476,
+ "submissionTime" : "2015-02-03T16:43:05.829GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:05.829GMT",
+ "completionTime" : "2015-02-03T16:43:06.286GMT",
"inputBytes" : 28000128,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -50,6 +56,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 4338,
+ "submissionTime" : "2015-02-03T16:43:04.228GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:04.234GMT",
+ "completionTime" : "2015-02-03T16:43:04.819GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -72,6 +81,9 @@
"numCompleteTasks" : 7,
"numFailedTasks" : 1,
"executorRunTime" : 278,
+ "submissionTime" : "2015-02-03T16:43:06.296GMT",
+ "firstTaskLaunchedTime" : "2015-02-03T16:43:06.296GMT",
+ "completionTime" : "2015-02-03T16:43:06.347GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -86,4 +98,4 @@
"details" : "org.apache.spark.rdd.RDD.count(RDD.scala:910)\n$line11.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:20)\n$line11.$read$$iwC$$iwC$$iwC.<init>(<console>:25)\n$line11.$read$$iwC$$iwC.<init>(<console>:27)\n$line11.$read$$iwC.<init>(<console>:29)\n$line11.$read.<init>(<console>:31)\n$line11.$read$.<init>(<console>:35)\n$line11.$read$.<clinit>(<console>)\n$line11.$eval$.<init>(<console>:7)\n$line11.$eval$.<clinit>(<console>)\n$line11.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:606)\norg.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)\norg.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)\norg.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)\norg.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)",
"schedulingPool" : "default",
"accumulatorUpdates" : [ ]
-} ] \ No newline at end of file
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
index 79ccacd309..afa425f8c2 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_list_with_accumulable_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 120,
+ "submissionTime" : "2015-03-16T19:25:36.103GMT",
+ "firstTaskLaunchedTime" : "2015-03-16T19:25:36.515GMT",
+ "completionTime" : "2015-03-16T19:25:36.579GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -24,4 +27,4 @@
"name" : "my counter",
"value" : "5050"
} ]
-} ] \ No newline at end of file
+} ]
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
index 32d5731676..12665a152c 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_with_accumulable_json_expectation.json
@@ -6,6 +6,9 @@
"numCompleteTasks" : 8,
"numFailedTasks" : 0,
"executorRunTime" : 120,
+ "submissionTime" : "2015-03-16T19:25:36.103GMT",
+ "firstTaskLaunchedTime" : "2015-03-16T19:25:36.515GMT",
+ "completionTime" : "2015-03-16T19:25:36.579GMT",
"inputBytes" : 0,
"inputRecords" : 0,
"outputBytes" : 0,
@@ -239,4 +242,4 @@
"diskBytesSpilled" : 0
}
}
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
new file mode 100644
index 0000000000..88817dccf3
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/AllStagesResourceSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1
+
+import java.util.Date
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.{StageInfo, TaskInfo, TaskLocality}
+import org.apache.spark.ui.jobs.UIData.{StageUIData, TaskUIData}
+
+class AllStagesResourceSuite extends SparkFunSuite {
+
+ def getFirstTaskLaunchTime(taskLaunchTimes: Seq[Long]): Option[Date] = {
+ val tasks = new HashMap[Long, TaskUIData]
+ taskLaunchTimes.zipWithIndex.foreach { case (time, idx) =>
+ tasks(idx.toLong) = new TaskUIData(
+ new TaskInfo(idx, idx, 1, time, "", "", TaskLocality.ANY, false), None, None)
+ }
+
+ val stageUiData = new StageUIData()
+ stageUiData.taskData = tasks
+ val status = StageStatus.ACTIVE
+ val stageInfo = new StageInfo(
+ 1, 1, "stage 1", 10, Seq.empty, Seq.empty, "details abc", Seq.empty)
+ val stageData = AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, false)
+
+ stageData.firstTaskLaunchedTime
+ }
+
+ test("firstTaskLaunchedTime when there are no tasks") {
+ val result = getFirstTaskLaunchTime(Seq())
+ assert(result == None)
+ }
+
+ test("firstTaskLaunchedTime when there are tasks but none launched") {
+ val result = getFirstTaskLaunchTime(Seq(-100L, -200L, -300L))
+ assert(result == None)
+ }
+
+ test("firstTaskLaunchedTime when there are tasks and some launched") {
+ val result = getFirstTaskLaunchTime(Seq(-100L, 1449255596000L, 1449255597000L))
+ assert(result == Some(new Date(1449255596000L)))
+ }
+
+}