diff options
author | Ala Luszczak <ala@databricks.com> | 2017-02-10 21:10:02 +0100 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2017-02-10 21:10:02 +0100 |
commit | d785217b791882e075ad537852d49d78fc1ca31b (patch) | |
tree | fbfb20bdc0098918327080e7dca2d88feedd6d7f /sql | |
parent | 3a43ae7c0bbce8eda98f50a97a0138f860197a98 (diff) | |
download | spark-d785217b791882e075ad537852d49d78fc1ca31b.tar.gz spark-d785217b791882e075ad537852d49d78fc1ca31b.tar.bz2 spark-d785217b791882e075ad537852d49d78fc1ca31b.zip |
[SPARK-19549] Allow providing reason for stage/job cancelling
## What changes were proposed in this pull request?
This change add an optional argument to `SparkContext.cancelStage()` and `SparkContext.cancelJob()` functions, which allows the caller to provide exact reason for the cancellation.
## How was this patch tested?
Adds unit test.
Author: Ala Luszczak <ala@databricks.com>
Closes #16887 from ala/cancel.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala | 19 |
1 files changed, 15 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala index 3ebfd9ac3d..03bf2d760c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventually { + import testImplicits._ test("SPARK-7150 range api") { // numSlice is greater than length @@ -137,7 +138,9 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall test("Cancelling stage in a query with Range.") { val listener = new SparkListener { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { - Thread.sleep(100) + eventually(timeout(10.seconds)) { + assert(DataFrameRangeSuite.isTaskStarted) + } sparkContext.cancelStage(taskStart.stageId) } } @@ -145,9 +148,12 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall sparkContext.addSparkListener(listener) for (codegen <- Seq(true, false)) { withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) { + DataFrameRangeSuite.isTaskStarted = false val ex = intercept[SparkException] { - spark.range(100000L).crossJoin(spark.range(100000L)) - .toDF("a", "b").agg(sum("a"), sum("b")).collect() + spark.range(100000L).mapPartitions { x => + DataFrameRangeSuite.isTaskStarted = true + x + }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect() } ex.getCause() match { case null => @@ -155,7 +161,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall case cause: SparkException => assert(cause.getMessage().contains("cancelled")) case cause: Throwable => - fail("Expected the casue to be SparkException, got " + cause.toString() + " instead.") + fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.") } } eventually(timeout(20.seconds)) { @@ -164,3 +170,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall } } } + +object DataFrameRangeSuite { + @volatile var isTaskStarted = false +} + |