aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala
diff options
context:
space:
mode:
authorAla Luszczak <ala@databricks.com>2017-02-13 20:07:39 +0100
committerReynold Xin <rxin@databricks.com>2017-02-13 20:07:39 +0100
commit0417ce8787245791342d5609446f0e2fc4c219b1 (patch)
tree846f4ec209e3d7e452e932bc092cf7c4a04b1e7f /sql/core/src/test/scala
parent1c4d10b10c78d138b55e381ec6828e04fef70d6f (diff)
downloadspark-0417ce8787245791342d5609446f0e2fc4c219b1.tar.gz
spark-0417ce8787245791342d5609446f0e2fc4c219b1.tar.bz2
spark-0417ce8787245791342d5609446f0e2fc4c219b1.zip
[SPARK-19514] Enhancing the test for Range interruption.
Improve the test for SPARK-19514, so that it's clear which stage is being cancelled. Author: Ala Luszczak <ala@databricks.com> Closes #16914 from ala/fix-range-test.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala21
1 files changed, 10 insertions, 11 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 03bf2d760c..acf393a9b0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -23,8 +23,8 @@ import scala.util.Random
import org.scalatest.concurrent.Eventually
-import org.apache.spark.SparkException
-import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskStart}
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -137,23 +137,23 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
test("Cancelling stage in a query with Range.") {
val listener = new SparkListener {
- override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+ override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
eventually(timeout(10.seconds)) {
- assert(DataFrameRangeSuite.isTaskStarted)
+ assert(DataFrameRangeSuite.stageToKill > 0)
}
- sparkContext.cancelStage(taskStart.stageId)
+ sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
}
}
sparkContext.addSparkListener(listener)
for (codegen <- Seq(true, false)) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
- DataFrameRangeSuite.isTaskStarted = false
+ DataFrameRangeSuite.stageToKill = -1
val ex = intercept[SparkException] {
- spark.range(100000L).mapPartitions { x =>
- DataFrameRangeSuite.isTaskStarted = true
+ spark.range(1000000000L).map { x =>
+ DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
x
- }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect()
+ }.toDF("id").agg(sum("id")).collect()
}
ex.getCause() match {
case null =>
@@ -172,6 +172,5 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
}
object DataFrameRangeSuite {
- @volatile var isTaskStarted = false
+ @volatile var stageToKill = -1
}
-