diff options
author | Michael Armbrust <michael@databricks.com> | 2016-04-01 15:15:16 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-04-01 15:15:16 -0700 |
commit | 0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c (patch) | |
tree | 5c72eb22fb2ef033a6d08f989dcf4fa18d66a84f /sql/hive/src | |
parent | 0b7d4966ca7e02f351c4b92a74789cef4799fcb1 (diff) | |
download | spark-0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c.tar.gz spark-0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c.tar.bz2 spark-0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c.zip |
[SPARK-14255][SQL] Streaming Aggregation
This PR adds the ability to perform aggregations inside of a `ContinuousQuery`. In order to implement this feature, the planning of aggregation has augmented with a new `StatefulAggregationStrategy`. Unlike batch aggregation, stateful-aggregation uses the `StateStore` (introduced in #11645) to persist the results of partial aggregation across different invocations. The resulting physical plan performs the aggregation using the following progression:
- Partial Aggregation
- Shuffle
- Partial Merge (now there is at most 1 tuple per group)
- StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous)
- Partial Merge (now there is at most 1 tuple per group)
- StateStoreSave (saves the tuple for the next batch)
- Complete (output the current result of the aggregation)
The following refactoring was also performed to allow us to plug into existing code:
- The get/put implementation is taken from #12013
- The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern `PhysicalAggregation`
- The `AttributeReference` used to identify the result of an `AggregateFunction` as been moved into the `AggregateExpression` container. This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a `Map[(AggregateFunction, Boolean), Attribute]`. Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup.
- Some planning logic is moved from the `SessionState` into the `QueryExecution` to make it easier to override in the streaming case.
- The ability to write a `StreamTest` that checks only the output of the last batch has been added to simulate the future addition of output modes.
Author: Michael Armbrust <michael@databricks.com>
Closes #12048 from marmbrus/statefulAgg.
Diffstat (limited to 'sql/hive/src')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 2bdb428e9d..ff40c366c8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -77,8 +77,9 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) /** * Planner that takes into account Hive-specific strategies. */ - override lazy val planner: SparkPlanner = { - new SparkPlanner(ctx.sparkContext, conf, experimentalMethods) with HiveStrategies { + override def planner: SparkPlanner = { + new SparkPlanner(ctx.sparkContext, conf, experimentalMethods.extraStrategies) + with HiveStrategies { override val hiveContext = ctx override def strategies: Seq[Strategy] = { |