aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-01-23 22:30:51 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-01-23 22:30:51 -0800
commit60bd91a34078a9239fbf5e8f49ce8b680c11635d (patch)
tree19ec2a46221a113d7c129e552e0263e582683759
parente576c1ed793fe8ac6e65381dc0635413cc18470f (diff)
downloadspark-60bd91a34078a9239fbf5e8f49ce8b680c11635d.tar.gz
spark-60bd91a34078a9239fbf5e8f49ce8b680c11635d.tar.bz2
spark-60bd91a34078a9239fbf5e8f49ce8b680c11635d.zip
[SPARK-19268][SS] Disallow adaptive query execution for streaming queries
## What changes were proposed in this pull request? As adaptive query execution may change the number of partitions in different batches, it may break streaming queries. Hence, we should disallow this feature in Structured Streaming. ## How was this patch tested? `test("SPARK-19268: Adaptive query execution should be disallowed")`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #16683 from zsxwing/SPARK-19268.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala12
2 files changed, 17 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 7b9770dadd..0b9406b027 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -230,6 +230,12 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
+ if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
+ throw new AnalysisException(
+ s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
+ "is not supported in streaming DataFrames/Datasets")
+ }
+
new StreamingQueryWrapper(new StreamExecution(
sparkSession,
userSpecifiedName.orNull,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 8e16fd418a..f05e9d1fda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -30,8 +30,9 @@ import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkException
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils
@@ -238,6 +239,15 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
}
}
+ test("SPARK-19268: Adaptive query execution should be disallowed") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+ val e = intercept[AnalysisException] {
+ MemoryStream[Int].toDS.writeStream.queryName("test-query").format("memory").start()
+ }
+ assert(e.getMessage.contains(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key) &&
+ e.getMessage.contains("not supported"))
+ }
+ }
/** Run a body of code by defining a query on each dataset */
private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = {