diff options
author | Takuya UESHIN <ueshin@happy-camper.st> | 2016-02-16 10:54:44 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2016-02-16 10:54:44 -0800 |
commit | 19dc69de795eb08f3bab4988ad88732bf8ca7bae (patch) | |
tree | f680e8161af1d13ee2f9a46d5b9c23d67f7bf6f8 /sql/core | |
parent | 00c72d27bf2e3591c4068fb344fa3edf1662ad81 (diff) | |
download | spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.tar.gz spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.tar.bz2 spark-19dc69de795eb08f3bab4988ad88732bf8ca7bae.zip |
[SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.
Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`.
Author: Takuya UESHIN <ueshin@happy-camper.st>
Closes #10894 from ueshin/issues/SPARK-12976.
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala | 6 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala | 7 |
2 files changed, 5 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 97f65f18bf..e30adefc69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors.attachTree import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair @@ -206,10 +207,7 @@ object Exchange { val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } - // We need to use an interpreted ordering here because generated orderings cannot be - // serialized and this ordering needs to be created on the driver in order to be passed into - // Spark core code. - implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes) + implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes) new RangePartitioner(numPartitions, rddForSampling, ascending = true) case SinglePartition => new Partitioner { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 04daf9d0ce..ef76847bcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ @@ -88,11 +89,8 @@ case class TakeOrderedAndProject( override def outputPartitioning: Partitioning = SinglePartition - // We need to use an interpreted ordering here because generated orderings cannot be serialized - // and this ordering needs to be created on the driver in order to be passed into Spark core code. - private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output) - override def executeCollect(): Array[InternalRow] = { + val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projectList.isDefined) { val proj = UnsafeProjection.create(projectList.get, child.output) @@ -105,6 +103,7 @@ case class TakeOrderedAndProject( private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) protected override def doExecute(): RDD[InternalRow] = { + val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { child.execute().map(_.copy()).mapPartitions { iter => org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord) |