aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDevaraj K <devaraj@apache.org>2016-05-30 14:29:27 -0700
committerKay Ousterhout <kayousterhout@gmail.com>2016-05-30 14:29:27 -0700
commit5b21139dbf3bd09cb3a590bd0ffb857ea92dc23c (patch)
treedaa81d932fd5cc1d3ababd6eb86affe27c941699 /streaming/src
parent2d34183b273af1125181f04c49725efc2fa351af (diff)
downloadspark-5b21139dbf3bd09cb3a590bd0ffb857ea92dc23c.tar.gz
spark-5b21139dbf3bd09cb3a590bd0ffb857ea92dc23c.tar.bz2
spark-5b21139dbf3bd09cb3a590bd0ffb857ea92dc23c.zip
[SPARK-10530][CORE] Kill other task attempts when one taskattempt belonging the same task is succeeded in speculation
## What changes were proposed in this pull request? With this patch, TaskSetManager kills other running attempts when any one of the attempt succeeds for the same task. Also killed tasks will not be considered as failed tasks and they get listed separately in the UI and also shows the task state as KILLED instead of FAILED. ## How was this patch tested? core\src\test\scala\org\apache\spark\ui\jobs\JobProgressListenerSuite.scala core\src\test\scala\org\apache\spark\util\JsonProtocolSuite.scala I have verified this patch manually by enabling spark.speculation as true, when any attempt gets succeeded then other running attempts are getting killed for the same task and other pending tasks are getting assigned in those. And also when any attempt gets killed then they are considered as KILLED tasks and not considered as FAILED tasks. Please find the attached screen shots for the reference. ![stage-tasks-table](https://cloud.githubusercontent.com/assets/3174804/14075132/394c6a12-f4f4-11e5-8638-20ff7b8cc9bc.png) ![stages-table](https://cloud.githubusercontent.com/assets/3174804/14075134/3b60f412-f4f4-11e5-9ea6-dd0dcc86eb03.png) Ref : https://github.com/apache/spark/pull/11916 Author: Devaraj K <devaraj@apache.org> Closes #11996 from devaraj-kavali/SPARK-10530.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala1
2 files changed, 2 insertions, 0 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index c024b4ef7e..1352ca1c4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -97,6 +97,7 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
+ killed = 0,
total = batch.outputOperations.size)
}
</td>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 60122b4813..1a87fc790f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -146,6 +146,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks,
skipped = sparkJob.numSkippedTasks,
+ killed = sparkJob.numKilledTasks,
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>