diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-10-07 15:53:37 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-10-07 15:53:37 -0700 |
commit | 7e2e268289828ae664622c59b90d82938d957ff3 (patch) | |
tree | 2466abe4272aa7afd3d6b561641219b49b5514d9 /sql/core | |
parent | 37526aca2430e36a931fbe6e01a152e701a1b94e (diff) | |
download | spark-7e2e268289828ae664622c59b90d82938d957ff3.tar.gz spark-7e2e268289828ae664622c59b90d82938d957ff3.tar.bz2 spark-7e2e268289828ae664622c59b90d82938d957ff3.zip |
[SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes #8083 from JoshRosen/SPARK-9702.
Diffstat (limited to 'sql/core')
3 files changed, 27 insertions, 13 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 029f2264a6..8efa471600 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import java.util.Random + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjectio import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.util.MutablePair -import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} +import org.apache.spark._ /** * :: DeveloperApi :: @@ -130,7 +132,6 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf private val serializer: Serializer = { - val rowDataTypes = child.output.map(_.dataType).toArray if (tungstenMode) { new UnsafeRowSerializer(child.output.size) } else { @@ -141,6 +142,7 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una protected override def doExecute(): RDD[InternalRow] = attachTree(this , "execute") { val rdd = child.execute() val part: Partitioner = newPartitioning match { + case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(expressions, numPartitions) => new HashPartitioner(numPartitions) case RangePartitioning(sortingExpressions, numPartitions) => // Internally, RangePartitioner runs a job on the RDD that samples keys to compute @@ -162,7 +164,15 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. } - def getPartitionKeyExtractor(): InternalRow => InternalRow = newPartitioning match { + def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { + case RoundRobinPartitioning(numPartitions) => + // Distributes elements evenly across output partitions, starting from a random partition. + var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + (row: InternalRow) => { + // The HashPartitioner will handle the `mod` by the number of partitions + position += 1 + position + } case HashPartitioning(expressions, _) => newMutableProjection(expressions, child.output)() case RangePartitioning(_, _) | SinglePartition => identity case _ => sys.error(s"Exchange not implemented for $newPartitioning") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b078c8b6b0..d1bbf2e20f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -336,7 +336,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw new IllegalStateException( "logical distinct operator should have been replaced by aggregate in the optimizer") case logical.Repartition(numPartitions, shuffle, child) => - execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil + if (shuffle) { + execution.Exchange(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil + } else { + execution.Coalesce(numPartitions, planLater(child)) :: Nil + } case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 3e49e0a357..d4bbbeb39e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -17,21 +17,20 @@ package org.apache.spark.sql.execution +import java.util.Random + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD} +import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.CatalystTypeConverters -import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.StructType -import org.apache.spark.util.collection.ExternalSorter -import org.apache.spark.util.collection.unsafe.sort.PrefixComparator import org.apache.spark.util.random.PoissonSampler -import org.apache.spark.util.{CompletionIterator, MutablePair} +import org.apache.spark.util.MutablePair import org.apache.spark.{HashPartitioner, SparkEnv} /** @@ -279,10 +278,12 @@ case class TakeOrderedAndProject( /** * :: DeveloperApi :: * Return a new RDD that has exactly `numPartitions` partitions. + * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. + * if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of + * the 100 new partitions will claim 10 of the current partitions. */ @DeveloperApi -case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) - extends UnaryNode { +case class Coalesce(numPartitions: Int, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = { @@ -291,11 +292,10 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan) } protected override def doExecute(): RDD[InternalRow] = { - child.execute().map(_.copy()).coalesce(numPartitions, shuffle) + child.execute().map(_.copy()).coalesce(numPartitions, shuffle = false) } } - /** * :: DeveloperApi :: * Returns a table with the elements from left that are not in right using |