diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2016-02-16 10:54:44 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-02-16 10:54:44 -0800 |
commit | 19dc69de795eb08f3bab4988ad88732bf8ca7bae (patch) | |
tree | f680e8161af1d13ee2f9a46d5b9c23d67f7bf6f8 | |
parent | 00c72d27bf2e3591c4068fb344fa3edf1662ad81 (diff) | |
download | spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.tar.gz spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.tar.bz2 spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.zip |
[SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.
Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #10894 from ueshin/issues/SPARK-12976.
3 files changed, 42 insertions, 8 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala index 6de57537ec..5756f201fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.catalyst.expressions.codegen +import java.io.ObjectInputStream + import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * Inherits some default implementation for Java from `Ordering[Row]` @@ -138,3 +141,37 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering] } } + +/** + * A lazily generated row ordering comparator. + */ +class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] { + + def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = + this(ordering.map(BindReferences.bindReference(_, inputSchema))) + + @transient + private[this] var generatedOrdering = GenerateOrdering.generate(ordering) + + def compare(a: InternalRow, b: InternalRow): Int = { + generatedOrdering.compare(a, b) + } + + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { + in.defaultReadObject() + generatedOrdering = GenerateOrdering.generate(ordering) + } +} + +object LazilyGeneratedOrdering { + + /** + * Creates a [[LazilyGeneratedOrdering]] for the given schema, in natural ascending order. + */ + def forSchema(schema: StructType): LazilyGeneratedOrdering = { + new LazilyGeneratedOrdering(schema.zipWithIndex.map { + case (field, ordinal) => + SortOrder(BoundReference(ordinal, field.dataType, nullable = true), Ascending) + }) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 97f65f18bf..e30adefc69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair @@ -206,10 +207,7 @@ object Exchange { val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } - // We need to use an interpreted ordering here because generated orderings cannot be - // serialized and this ordering needs to be created on the driver in order to be passed into - // Spark core code. - implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes) + implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) new RangePartitioner(numPartitions, rddForSampling, ascending = true) case SinglePartition => new Partitioner { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 04daf9d0ce..ef76847bcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ @@ -88,11 +89,8 @@ case class TakeOrderedAndProject( override def outputPartitioning: Partitioning = SinglePartition - // We need to use an interpreted ordering here because generated orderings cannot be serialized - // and this ordering needs to be created on the driver in order to be passed into Spark core code. - private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output) - override def executeCollect(): Array[InternalRow] = { + val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projectList.isDefined) { val proj = UnsafeProjection.create(projectList.get, child.output) @@ -105,6 +103,7 @@ case class TakeOrderedAndProject( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) protected override def doExecute(): RDD[InternalRow] = { + val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { child.execute().map(_.copy()).mapPartitions { iter => org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord) |