aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAla Luszczak <ala@databricks.com>2017-02-10 21:10:02 +0100
committerReynold Xin <rxin@databricks.com>2017-02-10 21:10:02 +0100
commitd785217b791882e075ad537852d49d78fc1ca31b (patch)
treefbfb20bdc0098918327080e7dca2d88feedd6d7f /sql
parent3a43ae7c0bbce8eda98f50a97a0138f860197a98 (diff)
downloadspark-d785217b791882e075ad537852d49d78fc1ca31b.tar.gz
spark-d785217b791882e075ad537852d49d78fc1ca31b.tar.bz2
spark-d785217b791882e075ad537852d49d78fc1ca31b.zip
[SPARK-19549] Allow providing reason for stage/job cancelling
## What changes were proposed in this pull request? This change add an optional argument to `SparkContext.cancelStage()` and `SparkContext.cancelJob()` functions, which allows the caller to provide exact reason for the cancellation. ## How was this patch tested? Adds unit test. Author: Ala Luszczak <ala@databricks.com> Closes #16887 from ala/cancel.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala19
1 files changed, 15 insertions, 4 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 3ebfd9ac3d..03bf2d760c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.test.SharedSQLContext
class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventually {
+ import testImplicits._
test("SPARK-7150 range api") {
// numSlice is greater than length
@@ -137,7 +138,9 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
test("Cancelling stage in a query with Range.") {
val listener = new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
- Thread.sleep(100)
+ eventually(timeout(10.seconds)) {
+ assert(DataFrameRangeSuite.isTaskStarted)
+ }
sparkContext.cancelStage(taskStart.stageId)
}
}
@@ -145,9 +148,12 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
sparkContext.addSparkListener(listener)
for (codegen <- Seq(true, false)) {
withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> codegen.toString()) {
+ DataFrameRangeSuite.isTaskStarted = false
val ex = intercept[SparkException] {
- spark.range(100000L).crossJoin(spark.range(100000L))
- .toDF("a", "b").agg(sum("a"), sum("b")).collect()
+ spark.range(100000L).mapPartitions { x =>
+ DataFrameRangeSuite.isTaskStarted = true
+ x
+ }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), sum("b")).collect()
}
ex.getCause() match {
case null =>
@@ -155,7 +161,7 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
case cause: SparkException =>
assert(cause.getMessage().contains("cancelled"))
case cause: Throwable =>
- fail("Expected the casue to be SparkException, got " + cause.toString() + " instead.")
+ fail("Expected the cause to be SparkException, got " + cause.toString() + " instead.")
}
}
eventually(timeout(20.seconds)) {
@@ -164,3 +170,8 @@ class DataFrameRangeSuite extends QueryTest with SharedSQLContext with Eventuall
}
}
}
+
+object DataFrameRangeSuite {
+ @volatile var isTaskStarted = false
+}
+