aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-11-07 19:44:45 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-07 19:44:45 -0800
commit4b69a42eda3aff08eb7437c353fe2cc87ed67181 (patch)
tree5d8b2bbefed73905acff0870644682a1575e2b23 /sql
parentef362846eb448769bcf774fc9090a5013d459464 (diff)
downloadspark-4b69a42eda3aff08eb7437c353fe2cc87ed67181.tar.gz
spark-4b69a42eda3aff08eb7437c353fe2cc87ed67181.tar.bz2
spark-4b69a42eda3aff08eb7437c353fe2cc87ed67181.zip
[SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin
JIRA: https://issues.apache.org/jira/browse/SPARK-11362 We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We should use Spark's BitSet. Author: Liang-Chi Hsieh <viirya@appier.com> Closes #9316 from viirya/use-spark-bitset.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala18
1 files changed, 8 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 05d20f511a..aab177b2e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
case class BroadcastNestedLoopJoin(
@@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin(
/** All rows that either match both-way, or rows from streamed joined with nulls. */
val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter =>
val matchedRows = new CompactBuffer[InternalRow]
- // TODO: Use Spark's BitSet.
- val includedBroadcastTuples =
- new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
+ val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
val joinedRow = new JoinedRow
val leftNulls = new GenericMutableRow(left.output.size)
@@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin(
case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) =>
matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy()
streamRowMatched = true
- includedBroadcastTuples += i
+ includedBroadcastTuples.set(i)
case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) =>
matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy()
streamRowMatched = true
- includedBroadcastTuples += i
+ includedBroadcastTuples.set(i)
case _ =>
}
i += 1
@@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin(
val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2)
val allIncludedBroadcastTuples = includedBroadcastTuples.fold(
- new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
- )(_ ++ _)
+ new BitSet(broadcastedRelation.value.size)
+ )(_ | _)
val leftNulls = new GenericMutableRow(left.output.size)
val rightNulls = new GenericMutableRow(right.output.size)
@@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin(
val joinedRow = new JoinedRow
joinedRow.withLeft(leftNulls)
while (i < rel.length) {
- if (!allIncludedBroadcastTuples.contains(i)) {
+ if (!allIncludedBroadcastTuples.get(i)) {
buf += resultProj(joinedRow.withRight(rel(i))).copy()
}
i += 1
@@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin(
val joinedRow = new JoinedRow
joinedRow.withRight(rightNulls)
while (i < rel.length) {
- if (!allIncludedBroadcastTuples.contains(i)) {
+ if (!allIncludedBroadcastTuples.get(i)) {
buf += resultProj(joinedRow.withLeft(rel(i))).copy()
}
i += 1