aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorHerman van Hovell <hvanhovell@questtec.nl>2016-02-21 12:32:31 -0800
committerReynold Xin <rxin@databricks.com>2016-02-21 12:32:31 -0800
commitb6a873d6d4682796f55dbafadd0b5cad881f96ea (patch)
treef1d7431df3827d27fdd3eec7bd2503336d08ecc5 /sql/catalyst
parentaf441ddbd13f48c09b458c451d7bba3965a878d1 (diff)
downloadspark-b6a873d6d4682796f55dbafadd0b5cad881f96ea.tar.gz
spark-b6a873d6d4682796f55dbafadd0b5cad881f96ea.tar.bz2
spark-b6a873d6d4682796f55dbafadd0b5cad881f96ea.zip
[SPARK-13136][SQL] Create a dedicated Broadcast exchange operator
Quite a few Spark SQL join operators broadcast one side of the join to all nodes. The are a few problems with this: - This conflates broadcasting (a data exchange) with joining. Data exchanges should be managed by a different operator. - All these nodes implement their own (duplicate) broadcasting logic. - Re-use of indices is quite hard. This PR defines both a ```BroadcastDistribution``` and ```BroadcastPartitioning```, these contain a `BroadcastMode`. The `BroadcastMode` defines the way in which we transform the Array of `InternalRow`'s into an index. We currently support the following `BroadcastMode`'s: - IdentityBroadcastMode: This broadcasts the rows in their original form. - HashSetBroadcastMode: This applies a projection to the input rows, deduplicates these rows and broadcasts the resulting `Set`. - HashedRelationBroadcastMode: This transforms the input rows into a `HashedRelation`, and broadcasts this index. To match this distribution we implement a ```BroadcastExchange``` operator which will perform the broadcast for us, and have ```EnsureRequirements``` plan this operator. The old Exchange operator has been renamed into ShuffleExchange in order to clearly separate between Shuffled and Broadcasted exchanges. Finally the classes in Exchange.scala have been moved to a dedicated package. cc rxin davies Author: Herman van Hovell <hvanhovell@questtec.nl> Closes #11083 from hvanhovell/SPARK-13136.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala35
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala30
2 files changed, 64 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
new file mode 100644
index 0000000000..c646dcfa11
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.physical
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+/**
+ * Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are
+ * identity (tuples remain unchanged) or hashed (tuples are converted into some hash index).
+ */
+trait BroadcastMode {
+ def transform(rows: Array[InternalRow]): Any
+}
+
+/**
+ * IdentityBroadcastMode requires that rows are broadcasted in their original form.
+ */
+case object IdentityBroadcastMode extends BroadcastMode {
+ override def transform(rows: Array[InternalRow]): Array[InternalRow] = rows
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index d6e10c412c..45e2841ec9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.physical
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{DataType, IntegerType}
@@ -76,6 +77,12 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
}
/**
+ * Represents data where tuples are broadcasted to every node. It is quite common that the
+ * entire set of tuples is transformed into different data structure.
+ */
+case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
+
+/**
* Describes how an operator's output is split across partitions. The `compatibleWith`,
* `guarantees`, and `satisfies` methods describe relationships between child partitionings,
* target partitionings, and [[Distribution]]s. These relations are described more precisely in
@@ -213,7 +220,10 @@ case class RoundRobinPartitioning(numPartitions: Int) extends Partitioning {
case object SinglePartition extends Partitioning {
val numPartitions = 1
- override def satisfies(required: Distribution): Boolean = true
+ override def satisfies(required: Distribution): Boolean = required match {
+ case _: BroadcastDistribution => false
+ case _ => true
+ }
override def compatibleWith(other: Partitioning): Boolean = other.numPartitions == 1
@@ -351,3 +361,21 @@ case class PartitioningCollection(partitionings: Seq[Partitioning])
partitionings.map(_.toString).mkString("(", " or ", ")")
}
}
+
+/**
+ * Represents a partitioning where rows are collected, transformed and broadcasted to each
+ * node in the cluster.
+ */
+case class BroadcastPartitioning(mode: BroadcastMode) extends Partitioning {
+ override val numPartitions: Int = 1
+
+ override def satisfies(required: Distribution): Boolean = required match {
+ case BroadcastDistribution(m) if m == mode => true
+ case _ => false
+ }
+
+ override def compatibleWith(other: Partitioning): Boolean = other match {
+ case BroadcastPartitioning(m) if m == mode => true
+ case _ => false
+ }
+}