aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-04-14 20:57:03 -0700
committerYin Huai <yhuai@databricks.com>2016-04-14 20:57:03 -0700
commitb5c60bcdca3bcace607b204a6c196a5386e8a896 (patch)
treec21ab9836b0624bae41daa23e14de04dc1de7179 /sql/hive
parentff9ae61a3b7bbbfc2aac93a99c05a9e1ea9c08bc (diff)
downloadspark-b5c60bcdca3bcace607b204a6c196a5386e8a896.tar.gz
spark-b5c60bcdca3bcace607b204a6c196a5386e8a896.tar.bz2
spark-b5c60bcdca3bcace607b204a6c196a5386e8a896.zip
[SPARK-14447][SQL] Speed up TungstenAggregate w/ keys using VectorizedHashMap
## What changes were proposed in this pull request? This patch speeds up group-by aggregates by around 3-5x by leveraging an in-memory `AggregateHashMap` (please see https://github.com/apache/spark/pull/12161), an append-only aggregate hash map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates (and fall back to the `BytesToBytesMap` if a given key isn't found). Architecturally, it is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the key-value pairs. The index lookups in the array rely on linear probing (with a small number of maximum tries) and use an inexpensive hash function which makes it really efficient for a majority of lookups. However, using linear probing and an inexpensive hash function also makes it less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even for certain distribution of keys) and requires us to fall back on the latter for correctness. ## How was this patch tested? Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- codegen = F 2124 / 2204 9.9 101.3 1.0X codegen = T hashmap = F 1198 / 1364 17.5 57.1 1.8X codegen = T hashmap = T 369 / 600 56.8 17.6 5.8X Author: Sameer Agarwal <sameer@databricks.com> Closes #12345 from sameeragarwal/tungsten-aggregate-integration.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala47
1 files changed, 26 insertions, 21 deletions
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 94fbcb7ee2..84bb7edf03 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -967,27 +967,32 @@ class TungstenAggregationQuerySuite extends AggregationQuerySuite
class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
- (0 to 2).foreach { fallbackStartsAt =>
- withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> fallbackStartsAt.toString) {
- // Create a new df to make sure its physical operator picks up
- // spark.sql.TungstenAggregate.testFallbackStartsAt.
- // todo: remove it?
- val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan)
-
- QueryTest.checkAnswer(newActual, expectedAnswer) match {
- case Some(errorMessage) =>
- val newErrorMessage =
- s"""
- |The following aggregation query failed when using TungstenAggregate with
- |controlled fallback (it falls back to sort-based aggregation once it has processed
- |$fallbackStartsAt input rows). The query is
- |${actual.queryExecution}
- |
- |$errorMessage
- """.stripMargin
-
- fail(newErrorMessage)
- case None =>
+ Seq(false, true).foreach { enableColumnarHashMap =>
+ withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> enableColumnarHashMap.toString) {
+ (1 to 3).foreach { fallbackStartsAt =>
+ withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
+ s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") {
+ // Create a new df to make sure its physical operator picks up
+ // spark.sql.TungstenAggregate.testFallbackStartsAt.
+ // todo: remove it?
+ val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan)
+
+ QueryTest.checkAnswer(newActual, expectedAnswer) match {
+ case Some(errorMessage) =>
+ val newErrorMessage =
+ s"""
+ |The following aggregation query failed when using TungstenAggregate with
+ |controlled fallback (it falls back to bytes to bytes map once it has processed
+ |${fallbackStartsAt -1} input rows and to sort-based aggregation once it has
+ |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}
+ |
+ |$errorMessage
+ """.stripMargin
+
+ fail(newErrorMessage)
+ case None => // Success
+ }
+ }
}
}
}