From 9c878923db6634effed98c99bf24dd263bb7c6ad Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 5 Aug 2015 16:33:42 -0700 Subject: [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in SMJ This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen. This is an updated version of #7408. Author: Josh Rosen Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits: e610655 [Josh Rosen] Add comment RE: Ascending ordering 34b8e0c [Josh Rosen] Import ordering be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places. --- .../sql/catalyst/expressions/RowOrdering.scala | 93 --------------------- .../sql/catalyst/expressions/arithmetic.scala | 4 +- .../sql/catalyst/expressions/conditionals.scala | 4 +- .../spark/sql/catalyst/expressions/ordering.scala | 96 ++++++++++++++++++++++ .../sql/catalyst/expressions/predicates.scala | 8 +- .../apache/spark/sql/catalyst/util/TypeUtils.scala | 4 +- .../org/apache/spark/sql/types/StructType.scala | 4 +- .../catalyst/expressions/CodeGenerationSuite.scala | 2 +- .../org/apache/spark/sql/execution/Exchange.scala | 5 +- .../org/apache/spark/sql/execution/SparkPlan.scala | 14 +++- .../spark/sql/execution/basicOperators.scala | 4 +- .../spark/sql/execution/joins/SortMergeJoin.scala | 9 +- .../execution/UnsafeKVExternalSorterSuite.scala | 6 +- 13 files changed, 136 insertions(+), 117 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala deleted file mode 100644 index 873f5324c5..0000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types._ - - -/** - * An interpreted row ordering comparator. - */ -class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] { - - def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = - this(ordering.map(BindReferences.bindReference(_, inputSchema))) - - def compare(a: InternalRow, b: InternalRow): Int = { - var i = 0 - while (i < ordering.size) { - val order = ordering(i) - val left = order.child.eval(a) - val right = order.child.eval(b) - - if (left == null && right == null) { - // Both null, continue looking. - } else if (left == null) { - return if (order.direction == Ascending) -1 else 1 - } else if (right == null) { - return if (order.direction == Ascending) 1 else -1 - } else { - val comparison = order.dataType match { - case dt: AtomicType if order.direction == Ascending => - dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right) - case dt: AtomicType if order.direction == Descending => - dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right) - case s: StructType if order.direction == Ascending => - s.ordering.asInstanceOf[Ordering[Any]].compare(left, right) - case s: StructType if order.direction == Descending => - s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right) - case other => - throw new IllegalArgumentException(s"Type $other does not support ordered operations") - } - if (comparison != 0) { - return comparison - } - } - i += 1 - } - return 0 - } -} - -object RowOrdering { - - /** - * Returns true iff the data type can be ordered (i.e. can be sorted). - */ - def isOrderable(dataType: DataType): Boolean = dataType match { - case NullType => true - case dt: AtomicType => true - case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType)) - case _ => false - } - - /** - * Returns true iff outputs from the expressions can be ordered. - */ - def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType)) - - /** - * Creates a [[RowOrdering]] for the given schema, in natural ascending order. - */ - def forSchema(dataTypes: Seq[DataType]): RowOrdering = { - new RowOrdering(dataTypes.zipWithIndex.map { - case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending) - }) - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 5808e3f66d..98464edf4d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -320,7 +320,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic { override def nullable: Boolean = left.nullable && right.nullable - private lazy val ordering = TypeUtils.getOrdering(dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType) override def eval(input: InternalRow): Any = { val input1 = left.eval(input) @@ -374,7 +374,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic { override def nullable: Boolean = left.nullable && right.nullable - private lazy val ordering = TypeUtils.getOrdering(dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType) override def eval(input: InternalRow): Any = { val input1 = left.eval(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala index 961b1d8616..d51f3d3cef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala @@ -319,7 +319,7 @@ case class Least(children: Seq[Expression]) extends Expression { override def nullable: Boolean = children.forall(_.nullable) override def foldable: Boolean = children.forall(_.foldable) - private lazy val ordering = TypeUtils.getOrdering(dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType) override def checkInputDataTypes(): TypeCheckResult = { if (children.length <= 1) { @@ -374,7 +374,7 @@ case class Greatest(children: Seq[Expression]) extends Expression { override def nullable: Boolean = children.forall(_.nullable) override def foldable: Boolean = children.forall(_.foldable) - private lazy val ordering = TypeUtils.getOrdering(dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType) override def checkInputDataTypes(): TypeCheckResult = { if (children.length <= 1) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala new file mode 100644 index 0000000000..6407c73bc9 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ + + +/** + * An interpreted row ordering comparator. + */ +class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] { + + def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) = + this(ordering.map(BindReferences.bindReference(_, inputSchema))) + + def compare(a: InternalRow, b: InternalRow): Int = { + var i = 0 + while (i < ordering.size) { + val order = ordering(i) + val left = order.child.eval(a) + val right = order.child.eval(b) + + if (left == null && right == null) { + // Both null, continue looking. + } else if (left == null) { + return if (order.direction == Ascending) -1 else 1 + } else if (right == null) { + return if (order.direction == Ascending) 1 else -1 + } else { + val comparison = order.dataType match { + case dt: AtomicType if order.direction == Ascending => + dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right) + case dt: AtomicType if order.direction == Descending => + dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right) + case s: StructType if order.direction == Ascending => + s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right) + case s: StructType if order.direction == Descending => + s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right) + case other => + throw new IllegalArgumentException(s"Type $other does not support ordered operations") + } + if (comparison != 0) { + return comparison + } + } + i += 1 + } + return 0 + } +} + +object InterpretedOrdering { + + /** + * Creates a [[InterpretedOrdering]] for the given schema, in natural ascending order. + */ + def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = { + new InterpretedOrdering(dataTypes.zipWithIndex.map { + case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending) + }) + } +} + +object RowOrdering { + + /** + * Returns true iff the data type can be ordered (i.e. can be sorted). + */ + def isOrderable(dataType: DataType): Boolean = dataType match { + case NullType => true + case dt: AtomicType => true + case struct: StructType => struct.fields.forall(f => isOrderable(f.dataType)) + case _ => false + } + + /** + * Returns true iff outputs from the expressions can be ordered. + */ + def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType)) +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 68c832d719..fe7dffb815 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -376,7 +376,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso override def symbol: String = "<" - private lazy val ordering = TypeUtils.getOrdering(left.dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType) protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lt(input1, input2) } @@ -388,7 +388,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo override def symbol: String = "<=" - private lazy val ordering = TypeUtils.getOrdering(left.dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType) protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lteq(input1, input2) } @@ -400,7 +400,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar override def symbol: String = ">" - private lazy val ordering = TypeUtils.getOrdering(left.dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType) protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gt(input1, input2) } @@ -412,7 +412,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar override def symbol: String = ">=" - private lazy val ordering = TypeUtils.getOrdering(left.dataType) + private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType) protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gteq(input1, input2) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala index 0b41f92c61..bcf4d78fb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala @@ -54,10 +54,10 @@ object TypeUtils { def getNumeric(t: DataType): Numeric[Any] = t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]] - def getOrdering(t: DataType): Ordering[Any] = { + def getInterpretedOrdering(t: DataType): Ordering[Any] = { t match { case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]] - case s: StructType => s.ordering.asInstanceOf[Ordering[Any]] + case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]] } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 6928707f7b..9cbc207538 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -24,7 +24,7 @@ import org.json4s.JsonDSL._ import org.apache.spark.SparkException import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, RowOrdering} +import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference, Attribute, InterpretedOrdering$} /** @@ -301,7 +301,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru StructType(newFields) } - private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType)) + private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType)) } object StructType extends AbstractDataType { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index cc82f7c3f5..e310aee221 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -54,7 +54,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { // GenerateOrdering agrees with RowOrdering. (DataTypeTestUtils.atomicTypes ++ Set(NullType)).foreach { dataType => test(s"GenerateOrdering with $dataType") { - val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType)) + val rowOrdering = InterpretedOrdering.forSchema(Seq(dataType, dataType)) val genOrdering = GenerateOrdering.generate( BoundReference(0, dataType, nullable = true).asc :: BoundReference(1, dataType, nullable = true).asc :: Nil) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 05b009d193..6ea5eeedf1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -156,7 +156,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una val mutablePair = new MutablePair[InternalRow, Null]() iter.map(row => mutablePair.update(row.copy(), null)) } - implicit val ordering = new RowOrdering(sortingExpressions, child.output) + // We need to use an interpreted ordering here because generated orderings cannot be + // serialized and this ordering needs to be created on the driver in order to be passed into + // Spark core code. + implicit val ordering = new InterpretedOrdering(sortingExpressions, child.output) new RangePartitioner(numPartitions, rddForSampling, ascending = true) case SinglePartition => new Partitioner { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index dbc0cefbe2..2f29067f56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.types.DataType object SparkPlan { protected[sql] val currentContext = new ThreadLocal[SQLContext]() @@ -309,13 +310,22 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ throw e } else { log.error("Failed to generate ordering, fallback to interpreted", e) - new RowOrdering(order, inputSchema) + new InterpretedOrdering(order, inputSchema) } } } else { - new RowOrdering(order, inputSchema) + new InterpretedOrdering(order, inputSchema) } } + /** + * Creates a row ordering for the given schema, in natural ascending order. + */ + protected def newNaturalAscendingOrdering(dataTypes: Seq[DataType]): Ordering[InternalRow] = { + val order: Seq[SortOrder] = dataTypes.zipWithIndex.map { + case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending) + } + newOrdering(order, Seq.empty) + } } private[sql] trait LeafNode extends SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 477170297c..f4677b4ee8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -212,7 +212,9 @@ case class TakeOrderedAndProject( override def outputPartitioning: Partitioning = SinglePartition - private val ord: RowOrdering = new RowOrdering(sortOrder, child.output) + // We need to use an interpreted ordering here because generated orderings cannot be serialized + // and this ordering needs to be created on the driver in order to be passed into Spark core code. + private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output) // TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable. @transient private val projection = projectList.map(new InterpretedProjection(_, child.output)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index eb595490fb..4ae23c186c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -48,9 +48,6 @@ case class SortMergeJoin( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - // this is to manually construct an ordering that can be used to compare keys from both sides - private val keyOrdering: RowOrdering = RowOrdering.forSchema(leftKeys.map(_.dataType)) - override def outputOrdering: Seq[SortOrder] = requiredOrders(leftKeys) override def requiredChildOrdering: Seq[Seq[SortOrder]] = @@ -59,8 +56,10 @@ case class SortMergeJoin( @transient protected lazy val leftKeyGenerator = newProjection(leftKeys, left.output) @transient protected lazy val rightKeyGenerator = newProjection(rightKeys, right.output) - private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = + private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] = { + // This must be ascending in order to agree with the `keyOrdering` defined in `doExecute()`. keys.map(SortOrder(_, Ascending)) + } protected override def doExecute(): RDD[InternalRow] = { val leftResults = left.execute().map(_.copy()) @@ -68,6 +67,8 @@ case class SortMergeJoin( leftResults.zipPartitions(rightResults) { (leftIter, rightIter) => new Iterator[InternalRow] { + // An ordering that can be used to compare keys from both sides. + private[this] val keyOrdering = newNaturalAscendingOrdering(leftKeys.map(_.dataType)) // Mutable per row objects. private[this] val joinRow = new JoinedRow private[this] var leftElement: InternalRow = _ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala index 08156f0e39..a9515a03ac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala @@ -22,7 +22,7 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, RowOrdering, UnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection} import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.types._ import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator, TaskMemoryManager} @@ -144,8 +144,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite { } sorter.cleanupResources() - val keyOrdering = RowOrdering.forSchema(keySchema.map(_.dataType)) - val valueOrdering = RowOrdering.forSchema(valueSchema.map(_.dataType)) + val keyOrdering = InterpretedOrdering.forSchema(keySchema.map(_.dataType)) + val valueOrdering = InterpretedOrdering.forSchema(valueSchema.map(_.dataType)) val kvOrdering = new Ordering[(InternalRow, InternalRow)] { override def compare(x: (InternalRow, InternalRow), y: (InternalRow, InternalRow)): Int = { keyOrdering.compare(x._1, y._1) match { -- cgit v1.2.3