aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
authorSameer Agarwal <sameer@databricks.com>2016-04-14 20:57:03 -0700
committerYin Huai <yhuai@databricks.com>2016-04-14 20:57:03 -0700
commitb5c60bcdca3bcace607b204a6c196a5386e8a896 (patch)
treec21ab9836b0624bae41daa23e14de04dc1de7179 /sql/core/src
parentff9ae61a3b7bbbfc2aac93a99c05a9e1ea9c08bc (diff)
downloadspark-b5c60bcdca3bcace607b204a6c196a5386e8a896.tar.gz
spark-b5c60bcdca3bcace607b204a6c196a5386e8a896.tar.bz2
spark-b5c60bcdca3bcace607b204a6c196a5386e8a896.zip
[SPARK-14447][SQL] Speed up TungstenAggregate w/ keys using VectorizedHashMap
## What changes were proposed in this pull request? This patch speeds up group-by aggregates by around 3-5x by leveraging an in-memory `AggregateHashMap` (please see https://github.com/apache/spark/pull/12161), an append-only aggregate hash map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates (and fall back to the `BytesToBytesMap` if a given key isn't found). Architecturally, it is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the key-value pairs. The index lookups in the array rely on linear probing (with a small number of maximum tries) and use an inexpensive hash function which makes it really efficient for a majority of lookups. However, using linear probing and an inexpensive hash function also makes it less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even for certain distribution of keys) and requires us to fall back on the latter for correctness. ## How was this patch tested? Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- codegen = F 2124 / 2204 9.9 101.3 1.0X codegen = T hashmap = F 1198 / 1364 17.5 57.1 1.8X codegen = T hashmap = T 369 / 600 56.8 17.6 5.8X Author: Sameer Agarwal <sameer@databricks.com> Closes #12345 from sameeragarwal/tungsten-aggregate-integration.
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala227
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala (renamed from sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala)86
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala34
6 files changed, 279 insertions, 86 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 447dbe7018..29acc38ab3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -126,6 +126,7 @@ trait CodegenSupport extends SparkPlan {
// outputVars will be used to generate the code for UnsafeRow, so we should copy them
outputVars.map(_.copy())
}
+
val rowVar = if (row != null) {
ExprCode("", "false", row)
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 253592028c..f585759e58 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -70,12 +70,14 @@ case class TungstenAggregate(
}
}
- // This is for testing. We force TungstenAggregationIterator to fall back to sort-based
- // aggregation once it has processed a given number of input rows.
- private val testFallbackStartsAt: Option[Int] = {
+ // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash
+ // map and/or the sort-based aggregation once it has processed a given number of input rows.
+ private val testFallbackStartsAt: Option[(Int, Int)] = {
sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match {
case null | "" => None
- case fallbackStartsAt => Some(fallbackStartsAt.toInt)
+ case fallbackStartsAt =>
+ val splits = fallbackStartsAt.split(",").map(_.trim)
+ Some((splits.head.toInt, splits.last.toInt))
}
}
@@ -261,7 +263,15 @@ case class TungstenAggregate(
.map(_.asInstanceOf[DeclarativeAggregate])
private val bufferSchema = StructType.fromAttributes(aggregateBufferAttributes)
- // The name for HashMap
+ // The name for Vectorized HashMap
+ private var vectorizedHashMapTerm: String = _
+
+ // We currently only enable vectorized hashmap for long key/value types and partial aggregates
+ private val isVectorizedHashMapEnabled: Boolean = sqlContext.conf.columnarAggregateMapEnabled &&
+ (groupingKeySchema ++ bufferSchema).forall(_.dataType == LongType) &&
+ modes.forall(mode => mode == Partial || mode == PartialMerge)
+
+ // The name for UnsafeRow HashMap
private var hashMapTerm: String = _
private var sorterTerm: String = _
@@ -437,17 +447,18 @@ case class TungstenAggregate(
val initAgg = ctx.freshName("initAgg")
ctx.addMutableState("boolean", initAgg, s"$initAgg = false;")
- // create AggregateHashMap
- val isAggregateHashMapEnabled: Boolean = false
- val isAggregateHashMapSupported: Boolean =
- (groupingKeySchema ++ bufferSchema).forall(_.dataType == LongType)
- val aggregateHashMapTerm = ctx.freshName("aggregateHashMap")
- val aggregateHashMapClassName = ctx.freshName("GeneratedAggregateHashMap")
- val aggregateHashMapGenerator = new ColumnarAggMapCodeGenerator(ctx, aggregateHashMapClassName,
+ vectorizedHashMapTerm = ctx.freshName("vectorizedHashMap")
+ val vectorizedHashMapClassName = ctx.freshName("VectorizedHashMap")
+ val vectorizedHashMapGenerator = new VectorizedHashMapGenerator(ctx, vectorizedHashMapClassName,
groupingKeySchema, bufferSchema)
- if (isAggregateHashMapEnabled && isAggregateHashMapSupported) {
- ctx.addMutableState(aggregateHashMapClassName, aggregateHashMapTerm,
- s"$aggregateHashMapTerm = new $aggregateHashMapClassName();")
+ // Create a name for iterator from vectorized HashMap
+ val iterTermForVectorizedHashMap = ctx.freshName("vectorizedHashMapIter")
+ if (isVectorizedHashMapEnabled) {
+ ctx.addMutableState(vectorizedHashMapClassName, vectorizedHashMapTerm,
+ s"$vectorizedHashMapTerm = new $vectorizedHashMapClassName();")
+ ctx.addMutableState(
+ "java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>",
+ iterTermForVectorizedHashMap, "")
}
// create hashMap
@@ -465,11 +476,14 @@ case class TungstenAggregate(
val doAgg = ctx.freshName("doAggregateWithKeys")
ctx.addNewFunction(doAgg,
s"""
- ${if (isAggregateHashMapSupported) aggregateHashMapGenerator.generate() else ""}
+ ${if (isVectorizedHashMapEnabled) vectorizedHashMapGenerator.generate() else ""}
private void $doAgg() throws java.io.IOException {
$hashMapTerm = $thisPlan.createHashMap();
${child.asInstanceOf[CodegenSupport].produce(ctx, this)}
+ ${if (isVectorizedHashMapEnabled) {
+ s"$iterTermForVectorizedHashMap = $vectorizedHashMapTerm.rowIterator();"} else ""}
+
$iterTerm = $thisPlan.finishAggregate($hashMapTerm, $sorterTerm);
}
""")
@@ -484,6 +498,34 @@ case class TungstenAggregate(
// so `copyResult` should be reset to `false`.
ctx.copyResult = false
+ // Iterate over the aggregate rows and convert them from ColumnarBatch.Row to UnsafeRow
+ def outputFromGeneratedMap: Option[String] = {
+ if (isVectorizedHashMapEnabled) {
+ val row = ctx.freshName("vectorizedHashMapRow")
+ ctx.currentVars = null
+ ctx.INPUT_ROW = row
+ var schema: StructType = groupingKeySchema
+ bufferSchema.foreach(i => schema = schema.add(i))
+ val generateRow = GenerateUnsafeProjection.createCode(ctx, schema.toAttributes.zipWithIndex
+ .map { case (attr, i) => BoundReference(i, attr.dataType, attr.nullable) })
+ Option(
+ s"""
+ | while ($iterTermForVectorizedHashMap.hasNext()) {
+ | $numOutput.add(1);
+ | org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $row =
+ | (org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row)
+ | $iterTermForVectorizedHashMap.next();
+ | ${generateRow.code}
+ | ${consume(ctx, Seq.empty, {generateRow.value})}
+ |
+ | if (shouldStop()) return;
+ | }
+ |
+ | $vectorizedHashMapTerm.close();
+ """.stripMargin)
+ } else None
+ }
+
s"""
if (!$initAgg) {
$initAgg = true;
@@ -491,6 +533,8 @@ case class TungstenAggregate(
}
// output the result
+ ${outputFromGeneratedMap.getOrElse("")}
+
while ($iterTerm.next()) {
$numOutput.add(1);
UnsafeRow $keyTerm = (UnsafeRow) $iterTerm.getKey();
@@ -511,10 +555,13 @@ case class TungstenAggregate(
// create grouping key
ctx.currentVars = input
- val keyCode = GenerateUnsafeProjection.createCode(
+ val unsafeRowKeyCode = GenerateUnsafeProjection.createCode(
ctx, groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output)))
- val key = keyCode.value
- val buffer = ctx.freshName("aggBuffer")
+ val vectorizedRowKeys = ctx.generateExpressions(
+ groupingExpressions.map(e => BindReferences.bindReference[Expression](e, child.output)))
+ val unsafeRowKeys = unsafeRowKeyCode.value
+ val unsafeRowBuffer = ctx.freshName("unsafeRowAggBuffer")
+ val vectorizedRowBuffer = ctx.freshName("vectorizedAggBuffer")
// only have DeclarativeAggregate
val updateExpr = aggregateExpressions.flatMap { e =>
@@ -533,56 +580,124 @@ case class TungstenAggregate(
val inputAttr = aggregateBufferAttributes ++ child.output
ctx.currentVars = new Array[ExprCode](aggregateBufferAttributes.length) ++ input
- ctx.INPUT_ROW = buffer
- // TODO: support subexpression elimination
- val evals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx))
- val updates = evals.zipWithIndex.map { case (ev, i) =>
- val dt = updateExpr(i).dataType
- ctx.updateColumn(buffer, dt, i, ev, updateExpr(i).nullable)
- }
- val (checkFallback, resetCoulter, incCounter) = if (testFallbackStartsAt.isDefined) {
+ val (checkFallbackForGeneratedHashMap, checkFallbackForBytesToBytesMap, resetCounter,
+ incCounter) = if (testFallbackStartsAt.isDefined) {
val countTerm = ctx.freshName("fallbackCounter")
ctx.addMutableState("int", countTerm, s"$countTerm = 0;")
- (s"$countTerm < ${testFallbackStartsAt.get}", s"$countTerm = 0;", s"$countTerm += 1;")
+ (s"$countTerm < ${testFallbackStartsAt.get._1}",
+ s"$countTerm < ${testFallbackStartsAt.get._2}", s"$countTerm = 0;", s"$countTerm += 1;")
} else {
- ("true", "", "")
+ ("true", "true", "", "")
}
+ // We first generate code to probe and update the vectorized hash map. If the probe is
+ // successful the corresponding vectorized row buffer will hold the mutable row
+ val findOrInsertInVectorizedHashMap: Option[String] = {
+ if (isVectorizedHashMapEnabled) {
+ Option(
+ s"""
+ |if ($checkFallbackForGeneratedHashMap) {
+ | ${vectorizedRowKeys.map(_.code).mkString("\n")}
+ | if (${vectorizedRowKeys.map("!" + _.isNull).mkString(" && ")}) {
+ | $vectorizedRowBuffer = $vectorizedHashMapTerm.findOrInsert(
+ | ${vectorizedRowKeys.map(_.value).mkString(", ")});
+ | }
+ |}
+ """.stripMargin)
+ } else {
+ None
+ }
+ }
+
+ val updateRowInVectorizedHashMap: Option[String] = {
+ if (isVectorizedHashMapEnabled) {
+ ctx.INPUT_ROW = vectorizedRowBuffer
+ val vectorizedRowEvals = updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx))
+ val updateVectorizedRow = vectorizedRowEvals.zipWithIndex.map { case (ev, i) =>
+ val dt = updateExpr(i).dataType
+ ctx.updateColumn(vectorizedRowBuffer, dt, i, ev, updateExpr(i).nullable)
+ }
+ Option(
+ s"""
+ |// evaluate aggregate function
+ |${evaluateVariables(vectorizedRowEvals)}
+ |// update vectorized row
+ |${updateVectorizedRow.mkString("\n").trim}
+ """.stripMargin)
+ } else None
+ }
+
+ // Next, we generate code to probe and update the unsafe row hash map.
+ val findOrInsertInUnsafeRowMap: String = {
+ s"""
+ | if ($vectorizedRowBuffer == null) {
+ | // generate grouping key
+ | ${unsafeRowKeyCode.code.trim}
+ | ${hashEval.code.trim}
+ | if ($checkFallbackForBytesToBytesMap) {
+ | // try to get the buffer from hash map
+ | $unsafeRowBuffer =
+ | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
+ | }
+ | if ($unsafeRowBuffer == null) {
+ | if ($sorterTerm == null) {
+ | $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
+ | } else {
+ | $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
+ | }
+ | $resetCounter
+ | // the hash map had be spilled, it should have enough memory now,
+ | // try to allocate buffer again.
+ | $unsafeRowBuffer =
+ | $hashMapTerm.getAggregationBufferFromUnsafeRow($unsafeRowKeys, ${hashEval.value});
+ | if ($unsafeRowBuffer == null) {
+ | // failed to allocate the first page
+ | throw new OutOfMemoryError("No enough memory for aggregation");
+ | }
+ | }
+ | }
+ """.stripMargin
+ }
+
+ val updateRowInUnsafeRowMap: String = {
+ ctx.INPUT_ROW = unsafeRowBuffer
+ val unsafeRowBufferEvals =
+ updateExpr.map(BindReferences.bindReference(_, inputAttr).gen(ctx))
+ val updateUnsafeRowBuffer = unsafeRowBufferEvals.zipWithIndex.map { case (ev, i) =>
+ val dt = updateExpr(i).dataType
+ ctx.updateColumn(unsafeRowBuffer, dt, i, ev, updateExpr(i).nullable)
+ }
+ s"""
+ |// evaluate aggregate function
+ |${evaluateVariables(unsafeRowBufferEvals)}
+ |// update unsafe row buffer
+ |${updateUnsafeRowBuffer.mkString("\n").trim}
+ """.stripMargin
+ }
+
+
// We try to do hash map based in-memory aggregation first. If there is not enough memory (the
// hash map will return null for new key), we spill the hash map to disk to free memory, then
// continue to do in-memory aggregation and spilling until all the rows had been processed.
// Finally, sort the spilled aggregate buffers by key, and merge them together for same key.
s"""
- // generate grouping key
- ${keyCode.code.trim}
- ${hashEval.code.trim}
- UnsafeRow $buffer = null;
- if ($checkFallback) {
- // try to get the buffer from hash map
- $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value});
- }
- if ($buffer == null) {
- if ($sorterTerm == null) {
- $sorterTerm = $hashMapTerm.destructAndCreateExternalSorter();
- } else {
- $sorterTerm.merge($hashMapTerm.destructAndCreateExternalSorter());
- }
- $resetCoulter
- // the hash map had be spilled, it should have enough memory now,
- // try to allocate buffer again.
- $buffer = $hashMapTerm.getAggregationBufferFromUnsafeRow($key, ${hashEval.value});
- if ($buffer == null) {
- // failed to allocate the first page
- throw new OutOfMemoryError("No enough memory for aggregation");
- }
- }
+ UnsafeRow $unsafeRowBuffer = null;
+ org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row $vectorizedRowBuffer = null;
+
+ ${findOrInsertInVectorizedHashMap.getOrElse("")}
+
+ $findOrInsertInUnsafeRowMap
+
$incCounter
- // evaluate aggregate function
- ${evaluateVariables(evals)}
- // update aggregate buffer
- ${updates.mkString("\n").trim}
+ if ($vectorizedRowBuffer != null) {
+ // update vectorized row
+ ${updateRowInVectorizedHashMap.getOrElse("")}
+ } else {
+ // update unsafe row
+ $updateRowInUnsafeRowMap
+ }
"""
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index ce504e20e6..09384a482d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -85,7 +85,7 @@ class TungstenAggregationIterator(
newMutableProjection: (Seq[Expression], Seq[Attribute]) => (() => MutableProjection),
originalInputAttributes: Seq[Attribute],
inputIter: Iterator[InternalRow],
- testFallbackStartsAt: Option[Int],
+ testFallbackStartsAt: Option[(Int, Int)],
numOutputRows: LongSQLMetric,
dataSize: LongSQLMetric,
spillSize: LongSQLMetric)
@@ -171,7 +171,7 @@ class TungstenAggregationIterator(
// hashMap. If there is not enough memory, it will multiple hash-maps, spilling
// after each becomes full then using sort to merge these spills, finally do sort
// based aggregation.
- private def processInputs(fallbackStartsAt: Int): Unit = {
+ private def processInputs(fallbackStartsAt: (Int, Int)): Unit = {
if (groupingExpressions.isEmpty) {
// If there is no grouping expressions, we can just reuse the same buffer over and over again.
// Note that it would be better to eliminate the hash map entirely in the future.
@@ -187,7 +187,7 @@ class TungstenAggregationIterator(
val newInput = inputIter.next()
val groupingKey = groupingProjection.apply(newInput)
var buffer: UnsafeRow = null
- if (i < fallbackStartsAt) {
+ if (i < fallbackStartsAt._2) {
buffer = hashMap.getAggregationBufferFromUnsafeRow(groupingKey)
}
if (buffer == null) {
@@ -352,7 +352,7 @@ class TungstenAggregationIterator(
/**
* Start processing input rows.
*/
- processInputs(testFallbackStartsAt.getOrElse(Int.MaxValue))
+ processInputs(testFallbackStartsAt.getOrElse((Int.MaxValue, Int.MaxValue)))
// If we did not switch to sort-based aggregation in processInputs,
// we pre-load the first key-value pair from the map (to make hasNext idempotent).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
index e415dd8e6a..395cc7ab91 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ColumnarAggMapCodeGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
@@ -21,19 +21,24 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.types.StructType
/**
- * This is a helper object to generate an append-only single-key/single value aggregate hash
- * map that can act as a 'cache' for extremely fast key-value lookups while evaluating aggregates
- * (and fall back to the `BytesToBytesMap` if a given key isn't found). This is 'codegened' in
- * TungstenAggregate to speed up aggregates w/ key.
+ * This is a helper class to generate an append-only vectorized hash map that can act as a 'cache'
+ * for extremely fast key-value lookups while evaluating aggregates (and fall back to the
+ * `BytesToBytesMap` if a given key isn't found). This is 'codegened' in TungstenAggregate to speed
+ * up aggregates w/ key.
*
* It is backed by a power-of-2-sized array for index lookups and a columnar batch that stores the
* key-value pairs. The index lookups in the array rely on linear probing (with a small number of
* maximum tries) and use an inexpensive hash function which makes it really efficient for a
* majority of lookups. However, using linear probing and an inexpensive hash function also makes it
* less robust as compared to the `BytesToBytesMap` (especially for a large number of keys or even
- * for certain distribution of keys) and requires us to fall back on the latter for correctness.
+ * for certain distribution of keys) and requires us to fall back on the latter for correctness. We
+ * also use a secondary columnar batch that logically projects over the original columnar batch and
+ * is equivalent to the `BytesToBytesMap` aggregate buffer.
+ *
+ * NOTE: This vectorized hash map currently doesn't support nullable keys and falls back to the
+ * `BytesToBytesMap` to store them.
*/
-class ColumnarAggMapCodeGenerator(
+class VectorizedHashMapGenerator(
ctx: CodegenContext,
generatedClassName: String,
groupingKeySchema: StructType,
@@ -52,6 +57,10 @@ class ColumnarAggMapCodeGenerator(
|${generateEquals()}
|
|${generateHashFunction()}
+ |
+ |${generateRowIterator()}
+ |
+ |${generateClose()}
|}
""".stripMargin
}
@@ -65,27 +74,47 @@ class ColumnarAggMapCodeGenerator(
.mkString("\n")};
""".stripMargin
+ val generatedAggBufferSchema: String =
+ s"""
+ |new org.apache.spark.sql.types.StructType()
+ |${bufferSchema.map(key =>
+ s""".add("${key.name}", org.apache.spark.sql.types.DataTypes.${key.dataType})""")
+ .mkString("\n")};
+ """.stripMargin
+
s"""
| private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
+ | private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
| private int[] buckets;
| private int numBuckets;
| private int maxSteps;
| private int numRows = 0;
| private org.apache.spark.sql.types.StructType schema = $generatedSchema
+ | private org.apache.spark.sql.types.StructType aggregateBufferSchema =
+ | $generatedAggBufferSchema
+ |
+ | public $generatedClassName() {
+ | // TODO: These should be generated based on the schema
+ | int DEFAULT_CAPACITY = 1 << 16;
+ | double DEFAULT_LOAD_FACTOR = 0.25;
+ | int DEFAULT_MAX_STEPS = 2;
+ | assert (DEFAULT_CAPACITY > 0 && ((DEFAULT_CAPACITY & (DEFAULT_CAPACITY - 1)) == 0));
+ | this.maxSteps = DEFAULT_MAX_STEPS;
+ | numBuckets = (int) (DEFAULT_CAPACITY / DEFAULT_LOAD_FACTOR);
|
- | public $generatedClassName(int capacity, double loadFactor, int maxSteps) {
- | assert (capacity > 0 && ((capacity & (capacity - 1)) == 0));
- | this.maxSteps = maxSteps;
- | numBuckets = (int) (capacity / loadFactor);
| batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
- | org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
+ | org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
+ |
+ | // TODO: Possibly generate this projection in TungstenAggregate directly
+ | aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
+ | aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, DEFAULT_CAPACITY);
+ | for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
+ | aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length}));
+ | }
+ |
| buckets = new int[numBuckets];
| java.util.Arrays.fill(buckets, -1);
| }
- |
- | public $generatedClassName() {
- | new $generatedClassName(1 << 16, 0.25, 5);
- | }
""".stripMargin
}
@@ -103,7 +132,7 @@ class ColumnarAggMapCodeGenerator(
s"""
|// TODO: Improve this hash function
|private long hash($groupingKeySignature) {
- | return ${groupingKeys.map(_._2).mkString(" ^ ")};
+ | return ${groupingKeys.map(_._2).mkString(" | ")};
|}
""".stripMargin
}
@@ -175,12 +204,14 @@ class ColumnarAggMapCodeGenerator(
| ${groupingKeys.zipWithIndex.map(k =>
s"batch.column(${k._2}).putLong(numRows, ${k._1._2});").mkString("\n")}
| ${bufferValues.zipWithIndex.map(k =>
- s"batch.column(${groupingKeys.length + k._2}).putLong(numRows, 0);")
+ s"batch.column(${groupingKeys.length + k._2}).putNull(numRows);")
.mkString("\n")}
| buckets[idx] = numRows++;
- | return batch.getRow(buckets[idx]);
+ | batch.setNumRows(numRows);
+ | aggregateBufferBatch.setNumRows(numRows);
+ | return aggregateBufferBatch.getRow(buckets[idx]);
| } else if (equals(idx, ${groupingKeys.map(_._2).mkString(", ")})) {
- | return batch.getRow(buckets[idx]);
+ | return aggregateBufferBatch.getRow(buckets[idx]);
| }
| idx = (idx + 1) & (numBuckets - 1);
| step++;
@@ -190,4 +221,21 @@ class ColumnarAggMapCodeGenerator(
|}
""".stripMargin
}
+
+ private def generateRowIterator(): String = {
+ s"""
+ |public java.util.Iterator<org.apache.spark.sql.execution.vectorized.ColumnarBatch.Row>
+ | rowIterator() {
+ | return batch.rowIterator();
+ |}
+ """.stripMargin
+ }
+
+ private def generateClose(): String = {
+ s"""
+ |public void close() {
+ | batch.close();
+ |}
+ """.stripMargin
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2f9d63c2e8..20d9a28548 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -436,6 +436,13 @@ object SQLConf {
.stringConf
.createOptional
+ // TODO: This is still WIP and shouldn't be turned on without extensive test coverage
+ val COLUMNAR_AGGREGATE_MAP_ENABLED = SQLConfigBuilder("spark.sql.codegen.aggregate.map.enabled")
+ .internal()
+ .doc("When true, aggregate with keys use an in-memory columnar map to speed up execution.")
+ .booleanConf
+ .createWithDefault(false)
+
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -560,6 +567,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
+ def columnarAggregateMapEnabled: Boolean = getConf(COLUMNAR_AGGREGATE_MAP_ENABLED)
+
override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)
override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 352fd07d0e..d23f19c480 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -153,16 +153,36 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
ignore("aggregate with keys") {
val N = 20 << 20
- runBenchmark("Aggregate w keys", N) {
- sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
+ val benchmark = new Benchmark("Aggregate w keys", N)
+ def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()
+
+ benchmark.addCase(s"codegen = F") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
+ f()
+ }
+
+ benchmark.addCase(s"codegen = T hashmap = F") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "false")
+ f()
}
+ benchmark.addCase(s"codegen = T hashmap = T") { iter =>
+ sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
+ sqlContext.setConf("spark.sql.codegen.aggregate.map.enabled", "true")
+ f()
+ }
+
+ benchmark.run()
+
/*
- Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
- Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
- -------------------------------------------------------------------------------------------
- Aggregate w keys codegen=false 2429 / 2644 8.6 115.8 1.0X
- Aggregate w keys codegen=true 1535 / 1571 13.7 73.2 1.6X
+ Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
+ Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
+ Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
+ -------------------------------------------------------------------------------------------
+ codegen = F 2219 / 2392 9.4 105.8 1.0X
+ codegen = T hashmap = F 1330 / 1466 15.8 63.4 1.7X
+ codegen = T hashmap = T 384 / 518 54.7 18.3 5.8X
*/
}