aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-04-01 15:15:16 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-01 15:15:16 -0700
commit0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c (patch)
tree5c72eb22fb2ef033a6d08f989dcf4fa18d66a84f /sql/hive/src/main
parent0b7d4966ca7e02f351c4b92a74789cef4799fcb1 (diff)
downloadspark-0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c.tar.gz
spark-0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c.tar.bz2
spark-0fc4aaa71c5b4531b3a7c8ac71d62ea8e66b6f0c.zip
[SPARK-14255][SQL] Streaming Aggregation
This PR adds the ability to perform aggregations inside of a `ContinuousQuery`. In order to implement this feature, the planning of aggregation has augmented with a new `StatefulAggregationStrategy`. Unlike batch aggregation, stateful-aggregation uses the `StateStore` (introduced in #11645) to persist the results of partial aggregation across different invocations. The resulting physical plan performs the aggregation using the following progression: - Partial Aggregation - Shuffle - Partial Merge (now there is at most 1 tuple per group) - StateStoreRestore (now there is 1 tuple from this batch + optionally one from the previous) - Partial Merge (now there is at most 1 tuple per group) - StateStoreSave (saves the tuple for the next batch) - Complete (output the current result of the aggregation) The following refactoring was also performed to allow us to plug into existing code: - The get/put implementation is taken from #12013 - The logic for breaking down and de-duping the physical execution of aggregation has been move into a new pattern `PhysicalAggregation` - The `AttributeReference` used to identify the result of an `AggregateFunction` as been moved into the `AggregateExpression` container. This change moves the reference into the same object as the other intermediate references used in aggregation and eliminates the need to pass around a `Map[(AggregateFunction, Boolean), Attribute]`. Further clean up (using a different aggregation container for logical/physical plans) is deferred to a followup. - Some planning logic is moved from the `SessionState` into the `QueryExecution` to make it easier to override in the streaming case. - The ability to write a `StreamTest` that checks only the output of the last batch has been added to simulate the future addition of output modes. Author: Michael Armbrust <michael@databricks.com> Closes #12048 from marmbrus/statefulAgg.
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 2bdb428e9d..ff40c366c8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -77,8 +77,9 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx)
/**
* Planner that takes into account Hive-specific strategies.
*/
- override lazy val planner: SparkPlanner = {
- new SparkPlanner(ctx.sparkContext, conf, experimentalMethods) with HiveStrategies {
+ override def planner: SparkPlanner = {
+ new SparkPlanner(ctx.sparkContext, conf, experimentalMethods.extraStrategies)
+ with HiveStrategies {
override val hiveContext = ctx
override def strategies: Seq[Strategy] = {