diff options
author | Josh Rosen <joshrosen@databricks.com> | 2015-10-07 15:53:37 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-10-07 15:53:37 -0700 |
commit | 7e2e268289828ae664622c59b90d82938d957ff3 (patch) | |
tree | 2466abe4272aa7afd3d6b561641219b49b5514d9 /sql/catalyst | |
parent | 37526aca2430e36a931fbe6e01a152e701a1b94e (diff) | |
download | spark-7e2e268289828ae664622c59b90d82938d957ff3.tar.gz spark-7e2e268289828ae664622c59b90d82938d957ff3.tar.bz2 spark-7e2e268289828ae664622c59b90d82938d957ff3.zip |
[SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`.
Author: Josh Rosen <joshrosen@databricks.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Closes #8083 from JoshRosen/SPARK-9702.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 5ac3f1f5b0..86b9417477 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -194,6 +194,22 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning { override def guarantees(other: Partitioning): Boolean = false } +/** + * Represents a partitioning where rows are distributed evenly across output partitions + * by starting from a random target partition number and distributing rows in a round-robin + * fashion. This partitioning is used when implementing the DataFrame.repartition() operator. + */ +case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning { + override def satisfies(required: Distribution): Boolean = required match { + case UnspecifiedDistribution => true + case _ => false + } + + override def compatibleWith(other: Partitioning): Boolean = false + + override def guarantees(other: Partitioning): Boolean = false +} + case object SinglePartition extends Partitioning { val numPartitions = 1 |