aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-12-13 22:57:01 -0800
committerYin Huai <yhuai@databricks.com>2015-12-13 22:57:01 -0800
commit834e71489bf560302f9d743dff669df1134e9b74 (patch)
tree82da4c3367d9b80ba34a7bda70657f2fe4f6834a /sql/hive
parent2aecda284e22ec608992b6221e2f5ffbd51fcd24 (diff)
downloadspark-834e71489bf560302f9d743dff669df1134e9b74.tar.gz
spark-834e71489bf560302f9d743dff669df1134e9b74.tar.bz2
spark-834e71489bf560302f9d743dff669df1134e9b74.zip
[SPARK-12213][SQL] use multiple partitions for single distinct query
Currently, we could generate different plans for query with single distinct (depends on spark.sql.specializeSingleDistinctAggPlanning), one works better on low cardinality columns, the other works better for high cardinality column (default one). This PR change to generate a single plan (three aggregations and two exchanges), which work better in both cases, then we could safely remove the flag `spark.sql.specializeSingleDistinctAggPlanning` (introduced in 1.6). For a query like `SELECT COUNT(DISTINCT a) FROM table` will be ``` AGG-4 (count distinct) Shuffle to a single reducer Partial-AGG-3 (count distinct, no grouping) Partial-AGG-2 (grouping on a) Shuffle by a Partial-AGG-1 (grouping on a) ``` This PR also includes large refactor for aggregation (reduce 500+ lines of code) cc yhuai nongli marmbrus Author: Davies Liu <davies@databricks.com> Closes #10228 from davies/single_distinct.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala142
1 files changed, 67 insertions, 75 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 064c0004b8..5550198c02 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._
-import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
@@ -552,80 +551,73 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
}
test("single distinct column set") {
- Seq(true, false).foreach { specializeSingleDistinctAgg =>
- val conf =
- (SQLConf.SPECIALIZE_SINGLE_DISTINCT_AGG_PLANNING.key,
- specializeSingleDistinctAgg.toString)
- withSQLConf(conf) {
- // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword.
- checkAnswer(
- sqlContext.sql(
- """
- |SELECT
- | min(distinct value1),
- | sum(distinct value1),
- | avg(value1),
- | avg(value2),
- | max(distinct value1)
- |FROM agg2
- """.stripMargin),
- Row(-60, 70.0, 101.0/9.0, 5.6, 100))
-
- checkAnswer(
- sqlContext.sql(
- """
- |SELECT
- | mydoubleavg(distinct value1),
- | avg(value1),
- | avg(value2),
- | key,
- | mydoubleavg(value1 - 1),
- | mydoubleavg(distinct value1) * 0.1,
- | avg(value1 + value2)
- |FROM agg2
- |GROUP BY key
- """.stripMargin),
- Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) ::
- Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) ::
- Row(null, null, 3.0, 3, null, null, null) ::
- Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil)
-
- checkAnswer(
- sqlContext.sql(
- """
- |SELECT
- | key,
- | mydoubleavg(distinct value1),
- | mydoublesum(value2),
- | mydoublesum(distinct value1),
- | mydoubleavg(distinct value1),
- | mydoubleavg(value1)
- |FROM agg2
- |GROUP BY key
- """.stripMargin),
- Row(1, 120.0, -10.0, 40.0, 120.0, 70.0/3.0 + 100.0) ::
- Row(2, 100.0, 3.0, 0.0, 100.0, 1.0/3.0 + 100.0) ::
- Row(3, null, 3.0, null, null, null) ::
- Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil)
-
- checkAnswer(
- sqlContext.sql(
- """
- |SELECT
- | count(value1),
- | count(*),
- | count(1),
- | count(DISTINCT value1),
- | key
- |FROM agg2
- |GROUP BY key
- """.stripMargin),
- Row(3, 3, 3, 2, 1) ::
- Row(3, 4, 4, 2, 2) ::
- Row(0, 2, 2, 0, 3) ::
- Row(3, 4, 4, 3, null) :: Nil)
- }
- }
+ // DISTINCT is not meaningful with Max and Min, so we just ignore the DISTINCT keyword.
+ checkAnswer(
+ sqlContext.sql(
+ """
+ |SELECT
+ | min(distinct value1),
+ | sum(distinct value1),
+ | avg(value1),
+ | avg(value2),
+ | max(distinct value1)
+ |FROM agg2
+ """.stripMargin),
+ Row(-60, 70.0, 101.0/9.0, 5.6, 100))
+
+ checkAnswer(
+ sqlContext.sql(
+ """
+ |SELECT
+ | mydoubleavg(distinct value1),
+ | avg(value1),
+ | avg(value2),
+ | key,
+ | mydoubleavg(value1 - 1),
+ | mydoubleavg(distinct value1) * 0.1,
+ | avg(value1 + value2)
+ |FROM agg2
+ |GROUP BY key
+ """.stripMargin),
+ Row(120.0, 70.0/3.0, -10.0/3.0, 1, 67.0/3.0 + 100.0, 12.0, 20.0) ::
+ Row(100.0, 1.0/3.0, 1.0, 2, -2.0/3.0 + 100.0, 10.0, 2.0) ::
+ Row(null, null, 3.0, 3, null, null, null) ::
+ Row(110.0, 10.0, 20.0, null, 109.0, 11.0, 30.0) :: Nil)
+
+ checkAnswer(
+ sqlContext.sql(
+ """
+ |SELECT
+ | key,
+ | mydoubleavg(distinct value1),
+ | mydoublesum(value2),
+ | mydoublesum(distinct value1),
+ | mydoubleavg(distinct value1),
+ | mydoubleavg(value1)
+ |FROM agg2
+ |GROUP BY key
+ """.stripMargin),
+ Row(1, 120.0, -10.0, 40.0, 120.0, 70.0/3.0 + 100.0) ::
+ Row(2, 100.0, 3.0, 0.0, 100.0, 1.0/3.0 + 100.0) ::
+ Row(3, null, 3.0, null, null, null) ::
+ Row(null, 110.0, 60.0, 30.0, 110.0, 110.0) :: Nil)
+
+ checkAnswer(
+ sqlContext.sql(
+ """
+ |SELECT
+ | count(value1),
+ | count(*),
+ | count(1),
+ | count(DISTINCT value1),
+ | key
+ |FROM agg2
+ |GROUP BY key
+ """.stripMargin),
+ Row(3, 3, 3, 2, 1) ::
+ Row(3, 4, 4, 2, 2) ::
+ Row(0, 2, 2, 0, 3) ::
+ Row(3, 4, 4, 3, null) :: Nil)
}
test("single distinct multiple columns set") {