diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2016-05-25 12:02:07 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-05-25 12:02:07 -0700 |
commit | 4b88067416ce922ae15a1445cf953fb9b5c43427 (patch) | |
tree | 2722d836730b7b743d68b0094cda0874c484f577 /sql | |
parent | 1cb347fbc446092b478ae0578fc7d1b0626a9294 (diff) | |
download | spark-4b88067416ce922ae15a1445cf953fb9b5c43427.tar.gz spark-4b88067416ce922ae15a1445cf953fb9b5c43427.tar.bz2 spark-4b88067416ce922ae15a1445cf953fb9b5c43427.zip |
[SPARK-15483][SQL] IncrementalExecution should use extra strategies.
## What changes were proposed in this pull request?
Extra strategies does not work for streams because `IncrementalExecution` uses modified planner with stateful operations but it does not include extra strategies.
This pr fixes `IncrementalExecution` to include extra strategies to use them.
## How was this patch tested?
I added a test to check if extra strategies work for streams.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #13261 from ueshin/issues/SPARK-15483.
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala | 3 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 15 |
2 files changed, 17 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 8b96f65bc3..fe5f36e1cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -36,7 +36,8 @@ class IncrementalExecution private[sql]( extends QueryExecution(sparkSession, logicalPlan) { // TODO: make this always part of planning. - val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil + val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy +: + sparkSession.sessionState.experimentalMethods.extraStrategies // Modified planner with stateful operations. override def planner: SparkPlanner = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index b742206b58..ae89a6887a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -220,6 +220,21 @@ class StreamSuite extends StreamTest with SharedSQLContext { CheckOffsetLogLatestBatchId(2), CheckSinkLatestBatchId(2)) } + + test("insert an extraStrategy") { + try { + spark.experimental.extraStrategies = TestStrategy :: Nil + + val inputData = MemoryStream[(String, Int)] + val df = inputData.toDS().map(_._1).toDF("a") + + testStream(df)( + AddData(inputData, ("so slow", 1)), + CheckAnswer("so fast")) + } finally { + spark.experimental.extraStrategies = Nil + } + } } /** |