aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-01 13:20:26 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-01 13:20:26 -0700
commit3d1535d48822281953de1e8447de86fad728412a (patch)
tree52f12ad351ecc49c7e2563e0c19e6fa4c622476b /sql/catalyst
parentdf733cbeae7a53826e89574af5463fa018329a22 (diff)
downloadspark-3d1535d48822281953de1e8447de86fad728412a.tar.gz
spark-3d1535d48822281953de1e8447de86fad728412a.tar.bz2
spark-3d1535d48822281953de1e8447de86fad728412a.zip
[SPARK-9520] [SQL] Support in-place sort in UnsafeFixedWidthAggregationMap
This pull request adds a sortedIterator method to UnsafeFixedWidthAggregationMap that sorts its data in-place by the grouping key. This is needed so we can fallback to external sorting for aggregation. Author: Reynold Xin <rxin@databricks.com> Closes #7849 from rxin/bytes2bytes-sorting and squashes the following commits: 75018c6 [Reynold Xin] Updated documentation. 81a8694 [Reynold Xin] [SPARK-9520][SQL] Support in-place sort in UnsafeFixedWidthAggregationMap.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala12
2 files changed, 13 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index d79325aea8..000be70f17 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -125,6 +125,8 @@ object UnsafeProjection {
GenerateUnsafeProjection.generate(exprs)
}
+ def create(expr: Expression): UnsafeProjection = create(Seq(expr))
+
/**
* Returns an UnsafeProjection for given sequence of Expressions, which will be bound to
* `inputSchema`.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index dbd4616d28..cc848aa199 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -21,6 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.Private
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types.StructType
/**
* Inherits some default implementation for Java from `Ordering[Row]`
@@ -43,7 +44,16 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
protected def bind(in: Seq[SortOrder], inputSchema: Seq[Attribute]): Seq[SortOrder] =
in.map(BindReferences.bindReference(_, inputSchema))
- protected def create(ordering: Seq[SortOrder]): Ordering[InternalRow] = {
+ /**
+ * Creates a code gen ordering for sorting this schema, in ascending order.
+ */
+ def create(schema: StructType): BaseOrdering = {
+ create(schema.zipWithIndex.map { case (field, ordinal) =>
+ SortOrder(BoundReference(ordinal, field.dataType, nullable = true), Ascending)
+ })
+ }
+
+ protected def create(ordering: Seq[SortOrder]): BaseOrdering = {
val ctx = newCodeGenContext()
val comparisons = ordering.map { order =>