aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2015-12-28 12:23:28 -0800
committerMichael Armbrust <michael@databricks.com>2015-12-28 12:23:28 -0800
commite01c6c8664d74d434e9b6b3c8c70570f01d4a0a4 (patch)
tree7b454a9101f05281e5a7008df96cca7884a496d1 /sql
parent73b70f076d4e22396b7e145f2ce5974fbf788048 (diff)
downloadspark-e01c6c8664d74d434e9b6b3c8c70570f01d4a0a4.tar.gz
spark-e01c6c8664d74d434e9b6b3c8c70570f01d4a0a4.tar.bz2
spark-e01c6c8664d74d434e9b6b3c8c70570f01d4a0a4.zip
[SPARK-12287][SQL] Support UnsafeRow in MapPartitions/MapGroups/CoGroup
Support Unsafe Row in MapPartitions/MapGroups/CoGroup. Added a test case for MapPartitions. Since MapGroups and CoGroup are built on AppendColumns, all the related dataset test cases already can verify the correctness when MapGroups and CoGroup processing unsafe rows. davies cloud-fan Not sure if my understanding is right, please correct me. Thank you! Author: gatorsmile <gatorsmile@gmail.com> Closes #10398 from gatorsmile/unsafeRowMapGroup.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 21325beb1c..6b7b3bbbf6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -370,6 +370,10 @@ case class MapPartitions[T, U](
output: Seq[Attribute],
child: SparkPlan) extends UnaryNode {
+ override def canProcessSafeRows: Boolean = true
+ override def canProcessUnsafeRows: Boolean = true
+ override def outputsUnsafeRows: Boolean = true
+
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
val tBoundEncoder = tEncoder.bind(child.output)
@@ -391,6 +395,7 @@ case class AppendColumns[T, U](
// We are using an unsafe combiner.
override def canProcessSafeRows: Boolean = false
override def canProcessUnsafeRows: Boolean = true
+ override def outputsUnsafeRows: Boolean = true
override def output: Seq[Attribute] = child.output ++ newColumns
@@ -420,6 +425,10 @@ case class MapGroups[K, T, U](
output: Seq[Attribute],
child: SparkPlan) extends UnaryNode {
+ override def canProcessSafeRows: Boolean = true
+ override def canProcessUnsafeRows: Boolean = true
+ override def outputsUnsafeRows: Boolean = true
+
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil
@@ -459,6 +468,10 @@ case class CoGroup[Key, Left, Right, Result](
left: SparkPlan,
right: SparkPlan) extends BinaryNode {
+ override def canProcessSafeRows: Boolean = true
+ override def canProcessUnsafeRows: Boolean = true
+ override def outputsUnsafeRows: Boolean = true
+
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil