aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-29 20:16:11 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-29 20:16:11 -0800
commite6a02c66d53f59ba2d5c1548494ae80a385f9f5c (patch)
tree54be709e37f53a6bd7a4768568d4e416e6cc5d7a /sql/catalyst
parent12252d1da90fa7d2dffa3a7c249ecc8821dee130 (diff)
downloadspark-e6a02c66d53f59ba2d5c1548494ae80a385f9f5c.tar.gz
spark-e6a02c66d53f59ba2d5c1548494ae80a385f9f5c.tar.bz2
spark-e6a02c66d53f59ba2d5c1548494ae80a385f9f5c.zip
[SPARK-12914] [SQL] generate aggregation with grouping keys
This PR add support for grouping keys for generated TungstenAggregate. Spilling and performance improvements for BytesToBytesMap will be done by followup PR. Author: Davies Liu <davies@databricks.com> Closes #10855 from davies/gen_keys.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala47
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala27
2 files changed, 49 insertions, 25 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index e6704cf8bb..21f9198073 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -56,6 +56,20 @@ class CodegenContext {
val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]()
/**
+ * Add an object to `references`, create a class member to access it.
+ *
+ * Returns the name of class member.
+ */
+ def addReferenceObj(name: String, obj: Any, className: String = null): String = {
+ val term = freshName(name)
+ val idx = references.length
+ references += obj
+ val clsName = Option(className).getOrElse(obj.getClass.getName)
+ addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
+ term
+ }
+
+ /**
* Holding a list of generated columns as input of current operator, will be used by
* BoundReference to generate code.
*/
@@ -199,6 +213,39 @@ class CodegenContext {
}
/**
+ * Update a column in MutableRow from ExprCode.
+ */
+ def updateColumn(
+ row: String,
+ dataType: DataType,
+ ordinal: Int,
+ ev: ExprCode,
+ nullable: Boolean): String = {
+ if (nullable) {
+ // Can't call setNullAt on DecimalType, because we need to keep the offset
+ if (dataType.isInstanceOf[DecimalType]) {
+ s"""
+ if (!${ev.isNull}) {
+ ${setColumn(row, dataType, ordinal, ev.value)};
+ } else {
+ ${setColumn(row, dataType, ordinal, "null")};
+ }
+ """
+ } else {
+ s"""
+ if (!${ev.isNull}) {
+ ${setColumn(row, dataType, ordinal, ev.value)};
+ } else {
+ $row.setNullAt($ordinal);
+ }
+ """
+ }
+ } else {
+ s"""${setColumn(row, dataType, ordinal, ev.value)};"""
+ }
+ }
+
+ /**
* Returns the name used in accessor and setter for a Java primitive type.
*/
def primitiveTypeName(jt: String): String = jt match {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
index ec31db19b9..5b4dc8df86 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
@@ -88,31 +88,8 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
val updates = validExpr.zip(index).map {
case (e, i) =>
- if (e.nullable) {
- if (e.dataType.isInstanceOf[DecimalType]) {
- // Can't call setNullAt on DecimalType, because we need to keep the offset
- s"""
- if (this.isNull_$i) {
- ${ctx.setColumn("mutableRow", e.dataType, i, "null")};
- } else {
- ${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
- }
- """
- } else {
- s"""
- if (this.isNull_$i) {
- mutableRow.setNullAt($i);
- } else {
- ${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
- }
- """
- }
- } else {
- s"""
- ${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
- """
- }
-
+ val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
+ ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}
val allProjections = ctx.splitExpressions(ctx.INPUT_ROW, projectionCodes)