aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-10-23 13:25:47 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-10-23 13:25:47 +0200
commit21c7539a5274a7e77686d17a6261d56592b85c2d (patch)
tree11f611e2b47dd957c68dbb60a967a3aaf6f59a31 /sql/hive
parenteff4aed1ac1e500d4aa40665dd06b527dffbc111 (diff)
downloadspark-21c7539a5274a7e77686d17a6261d56592b85c2d.tar.gz
spark-21c7539a5274a7e77686d17a6261d56592b85c2d.tar.bz2
spark-21c7539a5274a7e77686d17a6261d56592b85c2d.zip
[SPARK-18038][SQL] Move output partitioning definition from UnaryNodeExec to its children
## What changes were proposed in this pull request? Jira : https://issues.apache.org/jira/browse/SPARK-18038 This was a suggestion by rxin over one of the dev list discussion : http://apache-spark-developers-list.1001551.n3.nabble.com/Project-not-preserving-child-partitioning-td19417.html His words: >> It would be better (safer) to move the output partitioning definition into each of the operator and remove it from UnaryExecNode. With this PR, following is the output partitioning and ordering for all the impls of `UnaryExecNode`. UnaryExecNode's impl | outputPartitioning | outputOrdering | comment ------------ | ------------- | ------------ | ------------ AppendColumnsExec | child's | Nil | child's ordering can be used AppendColumnsWithObjectExec | child's | Nil | child's ordering can be used BroadcastExchangeExec | BroadcastPartitioning | Nil | - CoalesceExec | UnknownPartitioning | Nil | - CollectLimitExec | SinglePartition | Nil | - DebugExec | child's | Nil | child's ordering can be used DeserializeToObjectExec | child's | Nil | child's ordering can be used ExpandExec | UnknownPartitioning | Nil | - FilterExec | child's | child's | - FlatMapGroupsInRExec | child's | Nil | child's ordering can be used GenerateExec | child's | Nil | need to dig more GlobalLimitExec | child's | child's | - HashAggregateExec | child's | Nil | - InputAdapter | child's | child's | - InsertIntoHiveTable | child's | Nil | terminal node, doesn't need partitioning LocalLimitExec | child's | child's | - MapElementsExec | child's | child's | - MapGroupsExec | child's | Nil | child's ordering can be used MapPartitionsExec | child's | Nil | child's ordering can be used ProjectExec | child's | child's | - SampleExec | child's | Nil | child's ordering can be used ScriptTransformation | child's | Nil | child's ordering can be used SerializeFromObjectExec | child's | Nil | child's ordering can be used ShuffleExchange | custom | Nil | - SortAggregateExec | child's | sort over grouped exprs | - SortExec | child's | custom | - StateStoreRestoreExec | child's | Nil | child's ordering can be used StateStoreSaveExec | child's | Nil | child's ordering can be used SubqueryExec | child's | child's | - TakeOrderedAndProjectExec | SinglePartition | custom | - WholeStageCodegenExec | child's | child's | - WindowExec | child's | child's | - ## How was this patch tested? This does NOT change any existing functionality so relying on existing tests Author: Tejas Patil <tejasp@fb.com> Closes #15575 from tejasapatil/SPARK-18038_UnaryNodeExec_output_partitioning.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala4
3 files changed, 10 insertions, 1 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 53bb3b93db..c3c4e2925b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution
import java.io.IOException
import java.net.URI
import java.text.SimpleDateFormat
-import java.util
import java.util.{Date, Random}
import scala.collection.JavaConverters._
@@ -36,6 +35,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
@@ -291,6 +291,8 @@ case class InsertIntoHiveTable(
Seq.empty[InternalRow]
}
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 1025b8f70d..50855e48bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -38,6 +38,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.hive.HiveShim._
@@ -61,6 +62,8 @@ case class ScriptTransformation(
override def producedAttributes: AttributeSet = outputSet -- inputSet
+ override def outputPartitioning: Partitioning = child.outputPartitioning
+
protected override def doExecute(): RDD[InternalRow] = {
def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
: Iterator[InternalRow] = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index a8e81d7a3c..0e837766e2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, UnaryExecNode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.types.StringType
@@ -135,5 +136,8 @@ private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExe
throw new IllegalArgumentException("intentional exception")
}
}
+
override def output: Seq[Attribute] = child.output
+
+ override def outputPartitioning: Partitioning = child.outputPartitioning
}