aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-05 16:33:42 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-05 16:33:42 -0700
commit9c878923db6634effed98c99bf24dd263bb7c6ad (patch)
treec5c1d459105d5cb4b0c1980ea567d155596bd10d /sql/catalyst
parentdac090d1e9be7dec6c5ebdb2a81105b87e853193 (diff)
downloadspark-9c878923db6634effed98c99bf24dd263bb7c6ad.tar.gz
spark-9c878923db6634effed98c99bf24dd263bb7c6ad.tar.bz2
spark-9c878923db6634effed98c99bf24dd263bb7c6ad.zip
[SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in SMJ
This patches renames `RowOrdering` to `InterpretedOrdering` and updates SortMergeJoin to use the `SparkPlan` methods for constructing its ordering so that it may benefit from codegen. This is an updated version of #7408. Author: Josh Rosen <joshrosen@databricks.com> Closes #7973 from JoshRosen/SPARK-9054 and squashes the following commits: e610655 [Josh Rosen] Add comment RE: Ascending ordering 34b8e0c [Josh Rosen] Import ordering be19a0f [Josh Rosen] [SPARK-9054] [SQL] Rename RowOrdering to InterpretedOrdering; use newOrdering in more places.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala)27
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala8
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala4
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala2
7 files changed, 28 insertions, 25 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 5808e3f66d..98464edf4d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -320,7 +320,7 @@ case class MaxOf(left: Expression, right: Expression) extends BinaryArithmetic {
override def nullable: Boolean = left.nullable && right.nullable
- private lazy val ordering = TypeUtils.getOrdering(dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
override def eval(input: InternalRow): Any = {
val input1 = left.eval(input)
@@ -374,7 +374,7 @@ case class MinOf(left: Expression, right: Expression) extends BinaryArithmetic {
override def nullable: Boolean = left.nullable && right.nullable
- private lazy val ordering = TypeUtils.getOrdering(dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
override def eval(input: InternalRow): Any = {
val input1 = left.eval(input)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
index 961b1d8616..d51f3d3cef 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionals.scala
@@ -319,7 +319,7 @@ case class Least(children: Seq[Expression]) extends Expression {
override def nullable: Boolean = children.forall(_.nullable)
override def foldable: Boolean = children.forall(_.foldable)
- private lazy val ordering = TypeUtils.getOrdering(dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
override def checkInputDataTypes(): TypeCheckResult = {
if (children.length <= 1) {
@@ -374,7 +374,7 @@ case class Greatest(children: Seq[Expression]) extends Expression {
override def nullable: Boolean = children.forall(_.nullable)
override def foldable: Boolean = children.forall(_.foldable)
- private lazy val ordering = TypeUtils.getOrdering(dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(dataType)
override def checkInputDataTypes(): TypeCheckResult = {
if (children.length <= 1) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
index 873f5324c5..6407c73bc9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/RowOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types._
/**
* An interpreted row ordering comparator.
*/
-class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))
@@ -49,9 +49,9 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
case dt: AtomicType if order.direction == Descending =>
dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case s: StructType if order.direction == Ascending =>
- s.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
+ s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if order.direction == Descending =>
- s.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
+ s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case other =>
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
}
@@ -65,6 +65,18 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
}
}
+object InterpretedOrdering {
+
+ /**
+ * Creates a [[InterpretedOrdering]] for the given schema, in natural ascending order.
+ */
+ def forSchema(dataTypes: Seq[DataType]): InterpretedOrdering = {
+ new InterpretedOrdering(dataTypes.zipWithIndex.map {
+ case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
+ })
+ }
+}
+
object RowOrdering {
/**
@@ -81,13 +93,4 @@ object RowOrdering {
* Returns true iff outputs from the expressions can be ordered.
*/
def isOrderable(exprs: Seq[Expression]): Boolean = exprs.forall(e => isOrderable(e.dataType))
-
- /**
- * Creates a [[RowOrdering]] for the given schema, in natural ascending order.
- */
- def forSchema(dataTypes: Seq[DataType]): RowOrdering = {
- new RowOrdering(dataTypes.zipWithIndex.map {
- case (dt, index) => new SortOrder(BoundReference(index, dt, nullable = true), Ascending)
- })
- }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 68c832d719..fe7dffb815 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -376,7 +376,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
override def symbol: String = "<"
- private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lt(input1, input2)
}
@@ -388,7 +388,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
override def symbol: String = "<="
- private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.lteq(input1, input2)
}
@@ -400,7 +400,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
override def symbol: String = ">"
- private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gt(input1, input2)
}
@@ -412,7 +412,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
override def symbol: String = ">="
- private lazy val ordering = TypeUtils.getOrdering(left.dataType)
+ private lazy val ordering = TypeUtils.getInterpretedOrdering(left.dataType)
protected override def nullSafeEval(input1: Any, input2: Any): Any = ordering.gteq(input1, input2)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
index 0b41f92c61..bcf4d78fb9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala
@@ -54,10 +54,10 @@ object TypeUtils {
def getNumeric(t: DataType): Numeric[Any] =
t.asInstanceOf[NumericType].numeric.asInstanceOf[Numeric[Any]]
- def getOrdering(t: DataType): Ordering[Any] = {
+ def getInterpretedOrdering(t: DataType): Ordering[Any] = {
t match {
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
- case s: StructType => s.ordering.asInstanceOf[Ordering[Any]]
+ case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index 6928707f7b..9cbc207538 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -24,7 +24,7 @@ import org.json4s.JsonDSL._
import org.apache.spark.SparkException
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute, RowOrdering}
+import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, AttributeReference, Attribute, InterpretedOrdering$}
/**
@@ -301,7 +301,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru
StructType(newFields)
}
- private[sql] val ordering = RowOrdering.forSchema(this.fields.map(_.dataType))
+ private[sql] val interpretedOrdering = InterpretedOrdering.forSchema(this.fields.map(_.dataType))
}
object StructType extends AbstractDataType {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
index cc82f7c3f5..e310aee221 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala
@@ -54,7 +54,7 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
// GenerateOrdering agrees with RowOrdering.
(DataTypeTestUtils.atomicTypes ++ Set(NullType)).foreach { dataType =>
test(s"GenerateOrdering with $dataType") {
- val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
+ val rowOrdering = InterpretedOrdering.forSchema(Seq(dataType, dataType))
val genOrdering = GenerateOrdering.generate(
BoundReference(0, dataType, nullable = true).asc ::
BoundReference(1, dataType, nullable = true).asc :: Nil)