diff options
4 files changed, 55 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 49db75e141..16362f756f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -447,16 +447,37 @@ case class TungstenAggregate( } } + /** + * Using the vectorized hash map in TungstenAggregate is currently supported for all primitive + * data types during partial aggregation. However, we currently only enable the hash map for a + * subset of cases that've been verified to show performance improvements on our benchmarks + * subject to an internal conf that sets an upper limit on the maximum length of the aggregate + * key/value schema. + * + * This list of supported use-cases should be expanded over time. + */ + private def enableVectorizedHashMap(ctx: CodegenContext): Boolean = { + val schemaLength = (groupingKeySchema ++ bufferSchema).length + val isSupported = + (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) || + f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) && + bufferSchema.nonEmpty && modes.forall(mode => mode == Partial || mode == PartialMerge) + + // We do not support byte array based decimal type for aggregate values as + // ColumnVector.putDecimal for high-precision decimals doesn't currently support in-place + // updates. Due to this, appending the byte array in the vectorized hash map can turn out to be + // quite inefficient and can potentially OOM the executor. + val isNotByteArrayDecimalType = bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType]) + .forall(!DecimalType.isByteArrayDecimalType(_)) + + isSupported && isNotByteArrayDecimalType && + schemaLength <= sqlContext.conf.vectorizedAggregateMapMaxColumns + } + private def doProduceWithKeys(ctx: CodegenContext): String = { val initAgg = ctx.freshName("initAgg") ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") - - // Enable vectorized hash map for all primitive data types during partial aggregation - isVectorizedHashMapEnabled = sqlContext.conf.columnarAggregateMapEnabled && - (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) || - f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) && - bufferSchema.forall(!_.dataType.isInstanceOf[StringType]) && bufferSchema.nonEmpty && - modes.forall(mode => mode == Partial || mode == PartialMerge) + isVectorizedHashMapEnabled = enableVectorizedHashMap(ctx) vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap") val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap") val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, aggregateExpressions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bbc424cba6..b268a4fef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -484,12 +484,14 @@ object SQLConf { .intConf .createWithDefault(40) - // TODO: This is still WIP and shouldn't be turned on without extensive test coverage - val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled") - .internal() - .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.") - .booleanConf - .createWithDefault(false) + val VECTORIZED_AGG_MAP_MAX_COLUMNS = + SQLConfigBuilder("spark.sql.codegen.aggregate.map.columns.max") + .internal() + .doc("Sets the maximum width of schema (aggregate keys + values) for which aggregate with" + + "keys uses an in-memory columnar map to speed up execution. Setting this to 0 effectively" + + "disables the columnar map") + .intConf + .createWithDefault(3) val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion") .internal() @@ -644,7 +646,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES) - def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED) + def vectorizedAggregateMapMaxColumns: Int = getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS) def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 7a120b9374..841263d3da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -163,13 +163,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -201,13 +201,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -238,13 +238,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -275,13 +275,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3") f() } @@ -322,13 +322,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.addCase(s"codegen = T hashmap = F") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0") f() } benchmark.addCase(s"codegen = T hashmap = T") { iter => sqlContext.setConf("spark.sql.codegen.wholeStage", "true") - sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true") + sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "10") f() } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index b16c9c133b..0ba72b033f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -967,8 +967,9 @@ class TungstenAggregationQuerySuite extends AggregationQuerySuite class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite { override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = { - Seq(false, true).foreach { enableColumnarHashMap => - withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> enableColumnarHashMap.toString) { + Seq(0, 10).foreach { maxColumnarHashMapColumns => + withSQLConf("spark.sql.codegen.aggregate.map.columns.max" -> + maxColumnarHashMapColumns.toString) { (1 to 3).foreach { fallbackStartsAt => withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" -> s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") { @@ -981,11 +982,11 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue case Some(errorMessage) => val newErrorMessage = s""" - |The following aggregation query failed when using TungstenAggregate with - |controlled fallback (it falls back to bytes to bytes map once it has processed - |${fallbackStartsAt -1} input rows and to sort-based aggregation once it has - |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution} - | + |The following aggregation query failed when using TungstenAggregate with + |controlled fallback (it falls back to bytes to bytes map once it has processed + |${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has + |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution} + | |$errorMessage """.stripMargin |