aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala4
16 files changed, 76 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index 39189a2b0c..2663129562 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -60,6 +61,8 @@ case class GenerateExec(
override def producedAttributes: AttributeSet = AttributeSet(output)
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
val boundGenerator = BindReferences.bindReference(generator, child.output)
protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index d8e0675e3e..cc576bbc4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
-import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
@@ -45,6 +45,10 @@ case class SortExec(
override def outputOrdering: Seq[SortOrder] = sortOrder
+ // sort performed is local within a given partition so will retain
+ // child operator's partitioning
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 48d6ef6dcd..24d0cffef8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -395,8 +395,6 @@ trait UnaryExecNode extends SparkPlan {
def child: SparkPlan
override final def children: Seq[SparkPlan] = child :: Nil
-
- override def outputPartitioning: Partitioning = child.outputPartitioning
}
trait BinaryExecNode extends SparkPlan {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 62bf6f4a81..6303483f22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -218,7 +218,9 @@ trait CodegenSupport extends SparkPlan {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
+
override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def doExecute(): RDD[InternalRow] = {
@@ -292,7 +294,9 @@ object WholeStageCodegenExec {
case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
+
override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override lazy val metrics = Map(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 06199ef3e8..4529ed067e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -63,6 +63,8 @@ case class HashAggregateExec(
override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def producedAttributes: AttributeSet =
AttributeSet(aggregateAttributes) ++
AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
index 2a81a823c4..be3198b8e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.Utils
@@ -66,6 +66,8 @@ case class SortAggregateExec(
groupingExpressions.map(SortOrder(_, Ascending)) :: Nil
}
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def outputOrdering: Seq[SortOrder] = {
groupingExpressions.map(SortOrder(_, Ascending))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index dd78a78491..37d750e621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -78,6 +78,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}
@@ -214,6 +216,8 @@ case class FilterExec(condition: Expression, child: SparkPlan)
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}
/**
@@ -234,6 +238,8 @@ case class SampleExec(
child: SparkPlan) extends UnaryExecNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
@@ -517,7 +523,9 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
"collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))
override def output: Seq[Attribute] = child.output
+
override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override def sameResult(o: SparkPlan): Boolean = o match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index dd9d83767e..0395c43ba2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{AccumulatorV2, LongAccumulator}
/**
@@ -162,6 +162,8 @@ package object debug {
}
}
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 86a8770715..9918ac327f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.apache.spark.util.Utils
-
/**
* Take the first `limit` elements and collect them to a single partition.
*
@@ -54,8 +53,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
val limit: Int
override def output: Seq[Attribute] = child.output
- override def outputOrdering: Seq[SortOrder] = child.outputOrdering
- override def outputPartitioning: Partitioning = child.outputPartitioning
+
protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter =>
iter.take(limit)
}
@@ -95,14 +93,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
* Take the first `limit` elements of each child partition, but do not collect or shuffle them.
*/
case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
+
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}
/**
* Take the first `limit` elements of the child's single output partition.
*/
case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec {
+
override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
+ override def outputOrdering: Seq[SortOrder] = child.outputOrdering
}
/**
@@ -122,8 +128,6 @@ case class TakeOrderedAndProjectExec(
projectList.map(_.toAttribute)
}
- override def outputPartitioning: Partitioning = SinglePartition
-
override def executeCollect(): Array[InternalRow] = {
val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
@@ -160,6 +164,8 @@ case class TakeOrderedAndProjectExec(
override def outputOrdering: Seq[SortOrder] = sortOrder
+ override def outputPartitioning: Partitioning = SinglePartition
+
override def simpleString: String = {
val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]")
val outputString = Utils.truncatedString(output, "[", ",", "]")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index 2acc5110e8..9df56bbf1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -68,6 +68,8 @@ case class DeserializeToObjectExec(
outputObjAttr: Attribute,
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport {
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}
@@ -102,6 +104,8 @@ case class SerializeFromObjectExec(
override def output: Seq[Attribute] = serializer.map(_.toAttribute)
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
}
@@ -171,6 +175,8 @@ case class MapPartitionsExec(
child: SparkPlan)
extends ObjectConsumerExec with ObjectProducerExec {
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override protected def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType)
@@ -231,6 +237,8 @@ case class MapElementsExec(
}
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}
/**
@@ -244,6 +252,8 @@ case class AppendColumnsExec(
override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute)
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
private def newColumnSchema = serializer.map(_.toAttribute).toStructType
override protected def doExecute(): RDD[InternalRow] = {
@@ -272,6 +282,8 @@ case class AppendColumnsWithObjectExec(
override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute)
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
private def inputSchema = inputSerializer.map(_.toAttribute).toStructType
private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType
@@ -304,6 +316,8 @@ case class MapGroupsExec(
outputObjAttr: Attribute,
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def requiredChildDistribution: Seq[Distribution] =
ClusteredDistribution(groupingAttributes) :: Nil
@@ -347,6 +361,9 @@ case class FlatMapGroupsInRExec(
child: SparkPlan) extends UnaryExecNode with ObjectProducerExec {
override def output: Seq[Attribute] = outputObjAttr :: Nil
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr)
override def requiredChildDistribution: Seq[Distribution] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
index 587ea7d02a..ad8238f189 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.streaming.state._
@@ -80,7 +81,10 @@ case class StateStoreRestoreExec(
}
}
}
+
override def output: Seq[Attribute] = child.output
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}
/**
@@ -116,6 +120,8 @@ case class StateStoreSaveExec(
override def output: Seq[Attribute] = child.output
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
/**
* Save all the rows to the state store, and return all the rows in the state store.
* Note that this returns an iterator that pipelines the saving to store with downstream
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
index 1dd281ebf1..80b87d5ffa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala
@@ -103,6 +103,8 @@ case class WindowExec(
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
/**
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
* used to determine which input row lies within the frame boundaries of an output row.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
index a19ea51af7..6abcb1f067 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala
@@ -57,4 +57,6 @@ case class ReferenceSort(
override def output: Seq[Attribute] = child.output
override def outputOrdering: Seq[SortOrder] = sortOrder
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 53bb3b93db..c3c4e2925b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution
import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
-import java.util
import java.util.{Date, Random}
import scala.collection.JavaConverters._
@@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
@@ -291,6 +291,8 @@ case class InsertIntoHiveTable(
Seq.empty[InternalRow]
}
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 1025b8f70d..50855e48bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.hive.HiveShim._
@@ -61,6 +62,8 @@ case class ScriptTransformation(
override def producedAttributes: AttributeSet = outputSet -- inputSet
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
protected override def doExecute(): RDD[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
: Iterator[InternalRow] = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index a8e81d7a3c..0e837766e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.types.StringType
@@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExe
throw new IllegalArgumentException("intentional exception")
}
}
+
override def output: Seq[Attribute] = child.output
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}