aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-08-14 17:35:17 -0700
committerReynold Xin <rxin@databricks.com>2015-08-14 17:35:17 -0700
commit932b24fd144232fb08184f0bd0a46369ecba164e (patch)
tree8a4b93b9189dd164dfc4cc23b332bce6991ea32c
parent18a761ef7a01a4dfa1dd91abe78cd68f2f8fdb67 (diff)
downloadspark-932b24fd144232fb08184f0bd0a46369ecba164e.tar.gz
spark-932b24fd144232fb08184f0bd0a46369ecba164e.tar.bz2
spark-932b24fd144232fb08184f0bd0a46369ecba164e.zip
[SPARK-9949] [SQL] Fix TakeOrderedAndProject's output.
https://issues.apache.org/jira/browse/SPARK-9949 Author: Yin Huai <yhuai@databricks.com> Closes #8179 from yhuai/SPARK-9949.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala20
2 files changed, 28 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 247c900baa..77b98064a9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -237,7 +237,10 @@ case class TakeOrderedAndProject(
projectList: Option[Seq[NamedExpression]],
child: SparkPlan) extends UnaryNode {
- override def output: Seq[Attribute] = child.output
+ override def output: Seq[Attribute] = {
+ val projectOutput = projectList.map(_.map(_.toAttribute))
+ projectOutput.getOrElse(child.output)
+ }
override def outputPartitioning: Partitioning = SinglePartition
@@ -263,6 +266,13 @@ case class TakeOrderedAndProject(
protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1)
override def outputOrdering: Seq[SortOrder] = sortOrder
+
+ override def simpleString: String = {
+ val orderByString = sortOrder.mkString("[", ",", "]")
+ val outputString = output.mkString("[", ",", "]")
+
+ s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)"
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 937a108543..fad93b014c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -162,9 +162,23 @@ class PlannerSuite extends SparkFunSuite with SharedSQLContext {
}
test("efficient limit -> project -> sort") {
- val query = testData.sort('key).select('value).limit(2).logicalPlan
- val planned = ctx.planner.TakeOrderedAndProject(query)
- assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+ {
+ val query =
+ testData.select('key, 'value).sort('key).limit(2).logicalPlan
+ val planned = ctx.planner.TakeOrderedAndProject(query)
+ assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+ assert(planned.head.output === testData.select('key, 'value).logicalPlan.output)
+ }
+
+ {
+ // We need to make sure TakeOrderedAndProject's output is correct when we push a project
+ // into it.
+ val query =
+ testData.select('key, 'value).sort('key).select('value, 'key).limit(2).logicalPlan
+ val planned = ctx.planner.TakeOrderedAndProject(query)
+ assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject])
+ assert(planned.head.output === testData.select('value, 'key).logicalPlan.output)
+ }
}
test("PartitioningCollection") {