aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-05-25 12:02:07 -0700
committerMichael Armbrust <michael@databricks.com>2016-05-25 12:02:07 -0700
commit4b88067416ce922ae15a1445cf953fb9b5c43427 (patch)
tree2722d836730b7b743d68b0094cda0874c484f577 /sql
parent1cb347fbc446092b478ae0578fc7d1b0626a9294 (diff)
downloadspark-4b88067416ce922ae15a1445cf953fb9b5c43427.tar.gz
spark-4b88067416ce922ae15a1445cf953fb9b5c43427.tar.bz2
spark-4b88067416ce922ae15a1445cf953fb9b5c43427.zip
[SPARK-15483][SQL] IncrementalExecution should use extra strategies.
## What changes were proposed in this pull request? Extra strategies does not work for streams because `IncrementalExecution` uses modified planner with stateful operations but it does not include extra strategies. This pr fixes `IncrementalExecution` to include extra strategies to use them. ## How was this patch tested? I added a test to check if extra strategies work for streams. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13261 from ueshin/issues/SPARK-15483.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala15
2 files changed, 17 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 8b96f65bc3..fe5f36e1cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -36,7 +36,8 @@ class IncrementalExecution private[sql](
extends QueryExecution(sparkSession, logicalPlan) {
// TODO: make this always part of planning.
- val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil
+ val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +:
+ sparkSession.sessionState.experimentalMethods.extraStrategies
// Modified planner with stateful operations.
override def planner: SparkPlanner =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b742206b58..ae89a6887a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -220,6 +220,21 @@ class StreamSuite extends StreamTest with SharedSQLContext {
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}
+
+ test("insert an extraStrategy") {
+ try {
+ spark.experimental.extraStrategies = TestStrategy :: Nil
+
+ val inputData = MemoryStream[(String, Int)]
+ val df = inputData.toDS().map(_._1).toDF("a")
+
+ testStream(df)(
+ AddData(inputData, ("so slow", 1)),
+ CheckAnswer("so fast"))
+ } finally {
+ spark.experimental.extraStrategies = Nil
+ }
+ }
}
/**