aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-10-07 15:53:37 -0700
committerYin Huai <yhuai@databricks.com>2015-10-07 15:53:37 -0700
commit7e2e268289828ae664622c59b90d82938d957ff3 (patch)
tree2466abe4272aa7afd3d6b561641219b49b5514d9 /sql/catalyst
parent37526aca2430e36a931fbe6e01a152e701a1b94e (diff)
downloadspark-7e2e268289828ae664622c59b90d82938d957ff3.tar.gz
spark-7e2e268289828ae664622c59b90d82938d957ff3.tar.bz2
spark-7e2e268289828ae664622c59b90d82938d957ff3.zip
[SPARK-9702] [SQL] Use Exchange to implement logical Repartition operator
This patch allows `Repartition` to support UnsafeRows. This is accomplished by implementing the logical `Repartition` operator in terms of `Exchange` and a new `RoundRobinPartitioning`. Author: Josh Rosen <joshrosen@databricks.com> Author: Liang-Chi Hsieh <viirya@appier.com> Closes #8083 from JoshRosen/SPARK-9702.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala16
1 files changed, 16 insertions, 0 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index 5ac3f1f5b0..86b9417477 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -194,6 +194,22 @@ case class UnknownPartitioning(numPartitions: Int) extends Partitioning {
override def guarantees(other: Partitioning): Boolean = false
}
+/**
+ * Represents a partitioning where rows are distributed evenly across output partitions
+ * by starting from a random target partition number and distributing rows in a round-robin
+ * fashion. This partitioning is used when implementing the DataFrame.repartition() operator.
+ */
+case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
+ override def satisfies(required: Distribution): Boolean = required match {
+ case UnspecifiedDistribution => true
+ case _ => false
+ }
+
+ override def compatibleWith(other: Partitioning): Boolean = false
+
+ override def guarantees(other: Partitioning): Boolean = false
+}
+
case object SinglePartition extends Partitioning {
val numPartitions = 1