aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-04-26 14:51:14 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-26 14:51:14 -0700
commit9797cc20c0b8fb34659df11af8eccb9ed293c52c (patch)
treef62e4ed5846ca15b3c007c8b31e983c6b286d515
parent6c5a837c509233d4008cffeaede111f17fea5289 (diff)
downloadspark-9797cc20c0b8fb34659df11af8eccb9ed293c52c.tar.gz
spark-9797cc20c0b8fb34659df11af8eccb9ed293c52c.tar.bz2
spark-9797cc20c0b8fb34659df11af8eccb9ed293c52c.zip
[SPARK-14929] [SQL] Disable vectorized map for wide schemas & high-precision decimals
## What changes were proposed in this pull request? While the vectorized hash map in `TungstenAggregate` is currently supported for all primitive data types during partial aggregation, this patch only enables the hash map for a subset of cases that've been verified to show performance improvements on our benchmarks subject to an internal conf that sets an upper limit on the maximum length of the aggregate key/value schema. This list of supported use-cases should be expanded over time. ## How was this patch tested? This is no new change in functionality so existing tests should suffice. Performance tests were done on TPCDS benchmarks. Author: Sameer Agarwal <sameer@databricks.com> Closes #12710 from sameeragarwal/vectorized-enable.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala15
4 files changed, 55 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 49db75e141..16362f756f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -447,16 +447,37 @@ case class TungstenAggregate(
}
}
+ /**
+ * Using the vectorized hash map in TungstenAggregate is currently supported for all primitive
+ * data types during partial aggregation. However, we currently only enable the hash map for a
+ * subset of cases that've been verified to show performance improvements on our benchmarks
+ * subject to an internal conf that sets an upper limit on the maximum length of the aggregate
+ * key/value schema.
+ *
+ * This list of supported use-cases should be expanded over time.
+ */
+ private def enableVectorizedHashMap(ctx: CodegenContext): Boolean = {
+ val schemaLength = (groupingKeySchema ++ bufferSchema).length
+ val isSupported =
+ (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) ||
+ f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) &&
+ bufferSchema.nonEmpty && modes.forall(mode => mode == Partial || mode == PartialMerge)
+
+ // We do not support byte array based decimal type for aggregate values as
+ // ColumnVector.putDecimal for high-precision decimals doesn't currently support in-place
+ // updates. Due to this, appending the byte array in the vectorized hash map can turn out to be
+ // quite inefficient and can potentially OOM the executor.
+ val isNotByteArrayDecimalType = bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType])
+ .forall(!DecimalType.isByteArrayDecimalType(_))
+
+ isSupported && isNotByteArrayDecimalType &&
+ schemaLength <= sqlContext.conf.vectorizedAggregateMapMaxColumns
+ }
+
private def doProduceWithKeys(ctx: CodegenContext): String = {
val initAgg = ctx.freshName("initAgg")
ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
-
- // Enable vectorized hash map for all primitive data types during partial aggregation
- isVectorizedHashMapEnabled = sqlContext.conf.columnarAggregateMapEnabled &&
- (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) ||
- f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) &&
- bufferSchema.forall(!_.dataType.isInstanceOf[StringType]) && bufferSchema.nonEmpty &&
- modes.forall(mode => mode == Partial || mode == PartialMerge)
+ isVectorizedHashMapEnabled = enableVectorizedHashMap(ctx)
vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap")
val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap")
val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, aggregateExpressions,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index bbc424cba6..b268a4fef7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -484,12 +484,14 @@ object SQLConf {
.intConf
.createWithDefault(40)
- // TODO: This is still WIP and shouldn't be turned on without extensive test coverage
- val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled")
- .internal()
- .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.")
- .booleanConf
- .createWithDefault(false)
+ val VECTORIZED_AGG_MAP_MAX_COLUMNS =
+ SQLConfigBuilder("spark.sql.codegen.aggregate.map.columns.max")
+ .internal()
+ .doc("Sets the maximum width of schema (aggregate keys + values) for which aggregate with" +
+ "keys uses an in-memory columnar map to speed up execution. Setting this to 0 effectively" +
+ "disables the columnar map")
+ .intConf
+ .createWithDefault(3)
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
.internal()
@@ -644,7 +646,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
- def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED)
+ def vectorizedAggregateMapMaxColumns: Int = getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS)
def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 7a120b9374..841263d3da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -163,13 +163,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -201,13 +201,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -238,13 +238,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -275,13 +275,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -322,13 +322,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "10")
f()
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index b16c9c133b..0ba72b033f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -967,8 +967,9 @@ class TungstenAggregationQuerySuite extends AggregationQuerySuite
class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
- Seq(false, true).foreach { enableColumnarHashMap =>
- withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> enableColumnarHashMap.toString) {
+ Seq(0, 10).foreach { maxColumnarHashMapColumns =>
+ withSQLConf("spark.sql.codegen.aggregate.map.columns.max" ->
+ maxColumnarHashMapColumns.toString) {
(1 to 3).foreach { fallbackStartsAt =>
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") {
@@ -981,11 +982,11 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue
case Some(errorMessage) =>
val newErrorMessage =
s"""
- |The following aggregation query failed when using TungstenAggregate with
- |controlled fallback (it falls back to bytes to bytes map once it has processed
- |${fallbackStartsAt -1} input rows and to sort-based aggregation once it has
- |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}
- |
+ |The following aggregation query failed when using TungstenAggregate with
+ |controlled fallback (it falls back to bytes to bytes map once it has processed
+ |${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has
+ |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}
+ |
|$errorMessage
""".stripMargin