aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala35
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala15
4 files changed, 55 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 49db75e141..16362f756f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -447,16 +447,37 @@ case class TungstenAggregate(
}
}
+ /**
+ * Using the vectorized hash map in TungstenAggregate is currently supported for all primitive
+ * data types during partial aggregation. However, we currently only enable the hash map for a
+ * subset of cases that've been verified to show performance improvements on our benchmarks
+ * subject to an internal conf that sets an upper limit on the maximum length of the aggregate
+ * key/value schema.
+ *
+ * This list of supported use-cases should be expanded over time.
+ */
+ private def enableVectorizedHashMap(ctx: CodegenContext): Boolean = {
+ val schemaLength = (groupingKeySchema ++ bufferSchema).length
+ val isSupported =
+ (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) ||
+ f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) &&
+ bufferSchema.nonEmpty && modes.forall(mode => mode == Partial || mode == PartialMerge)
+
+ // We do not support byte array based decimal type for aggregate values as
+ // ColumnVector.putDecimal for high-precision decimals doesn't currently support in-place
+ // updates. Due to this, appending the byte array in the vectorized hash map can turn out to be
+ // quite inefficient and can potentially OOM the executor.
+ val isNotByteArrayDecimalType = bufferSchema.map(_.dataType).filter(_.isInstanceOf[DecimalType])
+ .forall(!DecimalType.isByteArrayDecimalType(_))
+
+ isSupported && isNotByteArrayDecimalType &&
+ schemaLength <= sqlContext.conf.vectorizedAggregateMapMaxColumns
+ }
+
private def doProduceWithKeys(ctx: CodegenContext): String = {
val initAgg = ctx.freshName("initAgg")
ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
-
- // Enable vectorized hash map for all primitive data types during partial aggregation
- isVectorizedHashMapEnabled = sqlContext.conf.columnarAggregateMapEnabled &&
- (groupingKeySchema ++ bufferSchema).forall(f => ctx.isPrimitiveType(f.dataType) ||
- f.dataType.isInstanceOf[DecimalType] || f.dataType.isInstanceOf[StringType]) &&
- bufferSchema.forall(!_.dataType.isInstanceOf[StringType]) && bufferSchema.nonEmpty &&
- modes.forall(mode => mode == Partial || mode == PartialMerge)
+ isVectorizedHashMapEnabled = enableVectorizedHashMap(ctx)
vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap")
val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap")
val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, aggregateExpressions,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index bbc424cba6..b268a4fef7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -484,12 +484,14 @@ object SQLConf {
.intConf
.createWithDefault(40)
- // TODO: This is still WIP and shouldn't be turned on without extensive test coverage
- val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled")
- .internal()
- .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.")
- .booleanConf
- .createWithDefault(false)
+ val VECTORIZED_AGG_MAP_MAX_COLUMNS =
+ SQLConfigBuilder("spark.sql.codegen.aggregate.map.columns.max")
+ .internal()
+ .doc("Sets the maximum width of schema (aggregate keys + values) for which aggregate with" +
+ "keys uses an in-memory columnar map to speed up execution. Setting this to 0 effectively" +
+ "disables the columnar map")
+ .intConf
+ .createWithDefault(3)
val FILE_SINK_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSink.log.deletion")
.internal()
@@ -644,7 +646,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
override def runSQLonFile: Boolean = getConf(RUN_SQL_ON_FILES)
- def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED)
+ def vectorizedAggregateMapMaxColumns: Int = getConf(VECTORIZED_AGG_MAP_MAX_COLUMNS)
def variableSubstituteEnabled: Boolean = getConf(VARIABLE_SUBSTITUTE_ENABLED)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 7a120b9374..841263d3da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -163,13 +163,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -201,13 +201,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -238,13 +238,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -275,13 +275,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
}
@@ -322,13 +322,13 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}
benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
- sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "10")
f()
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index b16c9c133b..0ba72b033f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -967,8 +967,9 @@ class TungstenAggregationQuerySuite extends AggregationQuerySuite
class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQuerySuite {
override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
- Seq(false, true).foreach { enableColumnarHashMap =>
- withSQLConf("spark.sql.codegen.aggregate.map.enabled" -> enableColumnarHashMap.toString) {
+ Seq(0, 10).foreach { maxColumnarHashMapColumns =>
+ withSQLConf("spark.sql.codegen.aggregate.map.columns.max" ->
+ maxColumnarHashMapColumns.toString) {
(1 to 3).foreach { fallbackStartsAt =>
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
s"${(fallbackStartsAt - 1).toString}, ${fallbackStartsAt.toString}") {
@@ -981,11 +982,11 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue
case Some(errorMessage) =>
val newErrorMessage =
s"""
- |The following aggregation query failed when using TungstenAggregate with
- |controlled fallback (it falls back to bytes to bytes map once it has processed
- |${fallbackStartsAt -1} input rows and to sort-based aggregation once it has
- |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}
- |
+ |The following aggregation query failed when using TungstenAggregate with
+ |controlled fallback (it falls back to bytes to bytes map once it has processed
+ |${fallbackStartsAt - 1} input rows and to sort-based aggregation once it has
+ |processed $fallbackStartsAt input rows). The query is ${actual.queryExecution}
+ |
|$errorMessage
""".stripMargin