aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-02-16 10:54:44 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-02-16 10:54:44 -0800
commit19dc69de795eb08f3bab4988ad88732bf8ca7bae (patch)
treef680e8161af1d13ee2f9a46d5b9c23d67f7bf6f8 /sql
parent00c72d27bf2e3591c4068fb344fa3edf1662ad81 (diff)
downloadspark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.tar.gz
spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.tar.bz2
spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.zip
[SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.
Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #10894 from ueshin/issues/SPARK-12976.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala7
3 files changed, 42 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 6de57537ec..5756f201fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.catalyst.expressions.codegen
+import java.io.ObjectInputStream
+
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
/**
* Inherits some default implementation for Java from `Ordering[Row]`
@@ -138,3 +141,37 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
}
}
+
+/**
+ * A lazily generated row ordering comparator.
+ */
+class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+
+ def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
+ this(ordering.map(BindReferences.bindReference(_, inputSchema)))
+
+ @transient
+ private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
+
+ def compare(a: InternalRow, b: InternalRow): Int = {
+ generatedOrdering.compare(a, b)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ in.defaultReadObject()
+ generatedOrdering = GenerateOrdering.generate(ordering)
+ }
+}
+
+object LazilyGeneratedOrdering {
+
+ /**
+ * Creates a [[LazilyGeneratedOrdering]] for the given schema, in natural ascending order.
+ */
+ def forSchema(schema: StructType): LazilyGeneratedOrdering = {
+ new LazilyGeneratedOrdering(schema.zipWithIndex.map {
+ case (field, ordinal) =>
+ SortOrder(BoundReference(ordinal, field.dataType, nullable = true), Ascending)
+ })
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 97f65f18bf..e30adefc69 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
@@ -206,10 +207,7 @@ object Exchange {
val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
}
- // We need to use an interpreted ordering here because generated orderings cannot be
- // serialized and this ordering needs to be created on the driver in order to be passed into
- // Spark core code.
- implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes)
+ implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
case SinglePartition =>
new Partitioner {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 04daf9d0ce..ef76847bcb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
@@ -88,11 +89,8 @@ case class TakeOrderedAndProject(
override def outputPartitioning: Partitioning = SinglePartition
- // We need to use an interpreted ordering here because generated orderings cannot be serialized
- // and this ordering needs to be created on the driver in order to be passed into Spark core code.
- private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
-
override def executeCollect(): Array[InternalRow] = {
+ val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
if (projectList.isDefined) {
val proj = UnsafeProjection.create(projectList.get, child.output)
@@ -105,6 +103,7 @@ case class TakeOrderedAndProject(
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
protected override def doExecute(): RDD[InternalRow] = {
+ val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val localTopK: RDD[InternalRow] = {
child.execute().map(_.copy()).mapPartitions { iter =>
org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)