aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2015-12-14 19:42:16 -0800
committerDavies Liu <davies.liu@gmail.com>2015-12-14 19:42:16 -0800
commit606f99b942a25dc3d86138b00026462ffe58cded (patch)
treec091eba97f6569363d714244cd3a5fafddc62161 /sql
parentd13ff82cba10a1ce9889c4b416f1e43c717e3f10 (diff)
downloadspark-606f99b942a25dc3d86138b00026462ffe58cded.tar.gz
spark-606f99b942a25dc3d86138b00026462ffe58cded.tar.bz2
spark-606f99b942a25dc3d86138b00026462ffe58cded.zip
[SPARK-12288] [SQL] Support UnsafeRow in Coalesce/Except/Intersect.
Support UnsafeRow for the Coalesce/Except/Intersect. Could you review if my code changes are ok? davies Thank you! Author: gatorsmile <gatorsmile@gmail.com> Closes #10285 from gatorsmile/unsafeSupportCIE.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala35
2 files changed, 46 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index a42aea0b96..b3e4688557 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -137,7 +137,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan {
}
}
}
- override def outputsUnsafeRows: Boolean = children.forall(_.outputsUnsafeRows)
+ override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true
protected override def doExecute(): RDD[InternalRow] =
@@ -250,7 +250,9 @@ case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode {
child.execute().coalesce(numPartitions, shuffle = false)
}
+ override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
+ override def canProcessSafeRows: Boolean = true
}
/**
@@ -263,6 +265,10 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
+
+ override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
+ override def canProcessUnsafeRows: Boolean = true
+ override def canProcessSafeRows: Boolean = true
}
/**
@@ -275,6 +281,10 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
protected override def doExecute(): RDD[InternalRow] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
+
+ override def outputsUnsafeRows: Boolean = children.exists(_.outputsUnsafeRows)
+ override def canProcessUnsafeRows: Boolean = true
+ override def canProcessSafeRows: Boolean = true
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 13d68a103a..2328899bb2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -58,6 +58,41 @@ class RowFormatConvertersSuite extends SparkPlanTest with SharedSQLContext {
assert(!preparedPlan.outputsUnsafeRows)
}
+ test("coalesce can process unsafe rows") {
+ val plan = Coalesce(1, outputsUnsafe)
+ val preparedPlan = sqlContext.prepareForExecution.execute(plan)
+ assert(getConverters(preparedPlan).size === 1)
+ assert(preparedPlan.outputsUnsafeRows)
+ }
+
+ test("except can process unsafe rows") {
+ val plan = Except(outputsUnsafe, outputsUnsafe)
+ val preparedPlan = sqlContext.prepareForExecution.execute(plan)
+ assert(getConverters(preparedPlan).size === 2)
+ assert(preparedPlan.outputsUnsafeRows)
+ }
+
+ test("except requires all of its input rows' formats to agree") {
+ val plan = Except(outputsSafe, outputsUnsafe)
+ assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
+ val preparedPlan = sqlContext.prepareForExecution.execute(plan)
+ assert(preparedPlan.outputsUnsafeRows)
+ }
+
+ test("intersect can process unsafe rows") {
+ val plan = Intersect(outputsUnsafe, outputsUnsafe)
+ val preparedPlan = sqlContext.prepareForExecution.execute(plan)
+ assert(getConverters(preparedPlan).size === 2)
+ assert(preparedPlan.outputsUnsafeRows)
+ }
+
+ test("intersect requires all of its input rows' formats to agree") {
+ val plan = Intersect(outputsSafe, outputsUnsafe)
+ assert(plan.canProcessSafeRows && plan.canProcessUnsafeRows)
+ val preparedPlan = sqlContext.prepareForExecution.execute(plan)
+ assert(preparedPlan.outputsUnsafeRows)
+ }
+
test("execute() fails an assertion if inputs rows are of different formats") {
val e = intercept[AssertionError] {
Union(Seq(outputsSafe, outputsUnsafe)).execute()