diff options
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 21325beb1c..6b7b3bbbf6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -370,6 +370,10 @@ case class MapPartitions[T, U]( output: Seq[Attribute], child: SparkPlan) extends UnaryNode { + override def canProcessSafeRows: Boolean = true + override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = true + override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => val tBoundEncoder = tEncoder.bind(child.output) @@ -391,6 +395,7 @@ case class AppendColumns[T, U]( // We are using an unsafe combiner. override def canProcessSafeRows: Boolean = false override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = true override def output: Seq[Attribute] = child.output ++ newColumns @@ -420,6 +425,10 @@ case class MapGroups[K, T, U]( output: Seq[Attribute], child: SparkPlan) extends UnaryNode { + override def canProcessSafeRows: Boolean = true + override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = true + override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(groupingAttributes) :: Nil @@ -459,6 +468,10 @@ case class CoGroup[Key, Left, Right, Result]( left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def canProcessSafeRows: Boolean = true + override def canProcessUnsafeRows: Boolean = true + override def outputsUnsafeRows: Boolean = true + override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil |