diff options
author | Ala Luszczak <ala@databricks.com> | 2017-02-13 20:07:39 +0100 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2017-02-13 20:07:39 +0100 |
commit | 0417ce8787245791342d5609446f0e2fc4c219b1 (patch) | |
tree | 846f4ec209e3d7e452e932bc092cf7c4a04b1e7f /sql/core/src/test/scala | |
parent | 1c4d10b10c78d138b55e381ec6828e04fef70d6f (diff) | |
download | spark-0417ce8787245791342d5609446f0e2fc4c219b1.tar.gz spark-0417ce8787245791342d5609446f0e2fc4c219b1.tar.bz2 spark-0417ce8787245791342d5609446f0e2fc4c219b1.zip |
[SPARK-19514] Enhancing the test for Range interruption.
Improve the test for SPARK-19514, so that it's clear which stage is being cancelled.
Author: Ala Luszczak <ala@databricks.com>
Closes #16914 from ala/fix-range-test.
Diffstat (limited to 'sql/core/src/test/scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala | 21 |
1 files changed, 10 insertions, 11 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 03bf2d760c..acf393a9b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -23,8 +23,8 @@ import scala.util.Random import org.scalatest.concurrent.Eventually -import org.apache.spark.SparkException -import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskStart} +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -137,23 +137,23 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { - override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { eventually(timeout(10.seconds)) { - assert(DataFrameRangeSuite.isTaskStarted) + assert(DataFrameRangeSuite.stageToKill > 0) } - sparkContext.cancelStage(taskStart.stageId) + sparkContext.cancelStage(DataFrameRangeSuite.stageToKill) } } sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { - DataFrameRangeSuite.isTaskStarted = false + DataFrameRangeSuite.stageToKill = -1 val ex = intercept[SparkException] { - spark.range(100000L).mapPartitions { x => - DataFrameRangeSuite.isTaskStarted = true + spark.range(1000000000L).map { x => + DataFrameRangeSuite.stageToKill = TaskContext.get().stageId() x - }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect() + }.toDF("id").agg(sum("id")).collect() } ex.getCause() match { case null => @@ -172,6 +172,5 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall } object DataFrameRangeSuite { - @volatile var isTaskStarted = false + @volatile var stageToKill = -1 } - |