diff options
author | gatorsmile <gatorsmile@gmail.com> | 2015-12-14 19:42:16 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-12-14 19:42:16 -0800 |
commit | 606f99b942a25dc3d86138b00026462ffe58cded (patch) | |
tree | c091eba97f6569363d714244cd3a5fafddc62161 | |
parent | d13ff82cba10a1ce9889c4b416f1e43c717e3f10 (diff) | |
download | spark-606f99b942a25dc3d86138b00026462ffe58cded.tar.gz spark-606f99b942a25dc3d86138b00026462ffe58cded.tar.bz2 spark-606f99b942a25dc3d86138b00026462ffe58cded.zip |
[SPARK-12288] [SQL] Support UnsafeRow in Coalesce/Except/Intersect.
Support UnsafeRow for the Coalesce/Except/Intersect.
Could you review if my code changes are ok? davies Thank you!
Author: gatorsmile <gatorsmile@gmail.com>
Closes #10285 from gatorsmile/unsafeSupportCIE.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala | 12 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala | 35 |
2 files changed, 46 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index a42aea0b96..b3e4688557 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -137,7 +137,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { } } } - override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows) + override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows) override def canProcessUnsafeRows: Boolean = true override def canProcessSafeRows: Boolean = true protected override def doExecute(): RDD[InternalRow] = @@ -250,7 +250,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode { child.execute().coalesce(numPartitions, shuffle = false) } + override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true } /** @@ -263,6 +265,10 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { protected override def doExecute(): RDD[InternalRow] = { left.execute().map(_.copy()).subtract(right.execute().map(_.copy())) } + + override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows) + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true } /** @@ -275,6 +281,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { protected override def doExecute(): RDD[InternalRow] = { left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) } + + override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows) + override def canProcessUnsafeRows: Boolean = true + override def canProcessSafeRows: Boolean = true } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala index 13d68a103a..2328899bb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala @@ -58,6 +58,41 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext { assert(!preparedPlan.outputsUnsafeRows) } + test("coalesce can process unsafe rows") { + val plan = Coalesce(1, outputsUnsafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).size === 1) + assert(preparedPlan.outputsUnsafeRows) + } + + test("except can process unsafe rows") { + val plan = Except(outputsUnsafe, outputsUnsafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).size === 2) + assert(preparedPlan.outputsUnsafeRows) + } + + test("except requires all of its input rows' formats to agree") { + val plan = Except(outputsSafe, outputsUnsafe) + assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(preparedPlan.outputsUnsafeRows) + } + + test("intersect can process unsafe rows") { + val plan = Intersect(outputsUnsafe, outputsUnsafe) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(getConverters(preparedPlan).size === 2) + assert(preparedPlan.outputsUnsafeRows) + } + + test("intersect requires all of its input rows' formats to agree") { + val plan = Intersect(outputsSafe, outputsUnsafe) + assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows) + val preparedPlan = sqlContext.prepareForExecution.execute(plan) + assert(preparedPlan.outputsUnsafeRows) + } + test("execute() fails an assertion if inputs rows are of different formats") { val e = intercept[AssertionError] { Union(Seq(outputsSafe, outputsUnsafe)).execute() |