diff options
16 files changed, 76 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 39189a2b0c..2663129562 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -60,6 +61,8 @@ case class GenerateExec( override def producedAttributes: AttributeSet = AttributeSet(output) + override def outputPartitioning: Partitioning = child.outputPartitioning + val boundGenerator = BindReferences.bindReference(generator, child.output) protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index d8e0675e3e..cc576bbc4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -23,7 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -45,6 +45,10 @@ case class SortExec( override def outputOrdering: Seq[SortOrder] = sortOrder + // sort performed is local within a given partition so will retain + // child operator's partitioning + override def outputPartitioning: Partitioning = child.outputPartitioning + override def requiredChildDistribution: Seq[Distribution] = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 48d6ef6dcd..24d0cffef8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -395,8 +395,6 @@ trait UnaryExecNode extends SparkPlan { def child: SparkPlan override final def children: Seq[SparkPlan] = child :: Nil - - override def outputPartitioning: Partitioning = child.outputPartitioning } trait BinaryExecNode extends SparkPlan { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 62bf6f4a81..6303483f22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -218,7 +218,9 @@ trait CodegenSupport extends SparkPlan { case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def doExecute(): RDD[InternalRow] = { @@ -292,7 +294,9 @@ object WholeStageCodegenExec { case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override lazy val metrics = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 06199ef3e8..4529ed067e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -63,6 +63,8 @@ case class HashAggregateExec( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 2a81a823c4..be3198b8e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.util.Utils @@ -66,6 +66,8 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) :: Nil } + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = { groupingExpressions.map(SortOrder(_, Ascending)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index dd78a78491..37d750e621 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -78,6 +78,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) } override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } @@ -214,6 +216,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) } override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** @@ -234,6 +238,8 @@ case class SampleExec( child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -517,7 +523,9 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")) override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def sameResult(o: SparkPlan): Boolean = o match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index dd9d83767e..0395c43ba2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, CodegenContext, ExprCode} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.{AccumulatorV2, LongAccumulator} /** @@ -162,6 +162,8 @@ package object debug { } } + override def outputPartitioning: Partitioning = child.outputPartitioning + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 86a8770715..9918ac327f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.util.Utils - /** * Take the first `limit` elements and collect them to a single partition. * @@ -54,8 +53,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode trait BaseLimitExec extends UnaryExecNode with CodegenSupport { val limit: Int override def output: Seq[Attribute] = child.output - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } @@ -95,14 +93,22 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { * Take the first `limit` elements of each child partition, but do not collect or shuffle them. */ case class LocalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** * Take the first `limit` elements of the child's single output partition. */ case class GlobalLimitExec(limit: Int, child: SparkPlan) extends BaseLimitExec { + override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def outputOrdering: Seq[SortOrder] = child.outputOrdering } /** @@ -122,8 +128,6 @@ case class TakeOrderedAndProjectExec( projectList.map(_.toAttribute) } - override def outputPartitioning: Partitioning = SinglePartition - override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) @@ -160,6 +164,8 @@ case class TakeOrderedAndProjectExec( override def outputOrdering: Seq[SortOrder] = sortOrder + override def outputPartitioning: Partitioning = SinglePartition + override def simpleString: String = { val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") val outputString = Utils.truncatedString(output, "[", ",", "]") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala index 2acc5110e8..9df56bbf1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala @@ -68,6 +68,8 @@ case class DeserializeToObjectExec( outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec with CodegenSupport { + override def outputPartitioning: Partitioning = child.outputPartitioning + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } @@ -102,6 +104,8 @@ case class SerializeFromObjectExec( override def output: Seq[Attribute] = serializer.map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + override def inputRDDs(): Seq[RDD[InternalRow]] = { child.asInstanceOf[CodegenSupport].inputRDDs() } @@ -171,6 +175,8 @@ case class MapPartitionsExec( child: SparkPlan) extends ObjectConsumerExec with ObjectProducerExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + override protected def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsInternal { iter => val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType) @@ -231,6 +237,8 @@ case class MapElementsExec( } override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** @@ -244,6 +252,8 @@ case class AppendColumnsExec( override def output: Seq[Attribute] = child.output ++ serializer.map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + private def newColumnSchema = serializer.map(_.toAttribute).toStructType override protected def doExecute(): RDD[InternalRow] = { @@ -272,6 +282,8 @@ case class AppendColumnsWithObjectExec( override def output: Seq[Attribute] = (inputSerializer ++ newColumnsSerializer).map(_.toAttribute) + override def outputPartitioning: Partitioning = child.outputPartitioning + private def inputSchema = inputSerializer.map(_.toAttribute).toStructType private def newColumnSchema = newColumnsSerializer.map(_.toAttribute).toStructType @@ -304,6 +316,8 @@ case class MapGroupsExec( outputObjAttr: Attribute, child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { + override def outputPartitioning: Partitioning = child.outputPartitioning + override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(groupingAttributes) :: Nil @@ -347,6 +361,9 @@ case class FlatMapGroupsInRExec( child: SparkPlan) extends UnaryExecNode with ObjectProducerExec { override def output: Seq[Attribute] = outputObjAttr :: Nil + + override def outputPartitioning: Partitioning = child.outputPartitioning + override def producedAttributes: AttributeSet = AttributeSet(outputObjAttr) override def requiredChildDistribution: Seq[Distribution] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala index 587ea7d02a..ad8238f189 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulAggregate.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.execution.streaming.state._ @@ -80,7 +81,10 @@ case class StateStoreRestoreExec( } } } + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning } /** @@ -116,6 +120,8 @@ case class StateStoreSaveExec( override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + /** * Save all the rows to the state store, and return all the rows in the state store. * Note that this returns an iterator that pipelines the saving to store with downstream diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala index 1dd281ebf1..80b87d5ffa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala @@ -103,6 +103,8 @@ case class WindowExec( override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def outputPartitioning: Partitioning = child.outputPartitioning + /** * Create a bound ordering object for a given frame type and offset. A bound ordering object is * used to determine which input row lies within the frame boundaries of an output row. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala index a19ea51af7..6abcb1f067 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala @@ -57,4 +57,6 @@ case class ReferenceSort( override def output: Seq[Attribute] = child.output override def outputOrdering: Seq[SortOrder] = sortOrder + + override def outputPartitioning: Partitioning = child.outputPartitioning } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 53bb3b93db..c3c4e2925b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import java.io.IOException import java.net.URI import java.text.SimpleDateFormat -import java.util import java.util.{Date, Random} import scala.collection.JavaConverters._ @@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} @@ -291,6 +291,8 @@ case class InsertIntoHiveTable( Seq.empty[InternalRow] } + override def outputPartitioning: Partitioning = child.outputPartitioning + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index 1025b8f70d..50855e48bc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.HiveInspectors import org.apache.spark.sql.hive.HiveShim._ @@ -61,6 +62,8 @@ case class ScriptTransformation( override def producedAttributes: AttributeSet = outputSet -- inputSet + override def outputPartitioning: Partitioning = child.outputPartitioning + protected override def doExecute(): RDD[InternalRow] = { def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration) : Iterator[InternalRow] = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index a8e81d7a3c..0e837766e2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.types.StringType @@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExe throw new IllegalArgumentException("intentional exception") } } + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning } |