aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorDaoyuan <daoyuan.wang@intel.com>2014-06-11 12:08:28 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-11 12:09:42 -0700
commitce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2 (patch)
treee6486802508d79cc289cc3e050289635bc3e02f0 /sql/core
parent4107cce58c41160a0dc20339621eacdf8a8b1191 (diff)
downloadspark-ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2.tar.gz
spark-ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2.tar.bz2
spark-ce6deb1e5b4cd40c97730fcf5dc89cb2f624bce2.zip
[SQL] Code Cleanup: Left Semi Hash Join
Some improvement for PR #837, add another case to white list and use `filter` to build result iterator. Author: Daoyuan <daoyuan.wang@intel.com> Closes #1049 from adrian-wang/clean-LeftSemiJoinHash and squashes the following commits: b314d5a [Daoyuan] change hashSet name 27579a9 [Daoyuan] add semijoin to white list and use filter to create new iterator in LeftSemiJoinBNL Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala40
1 files changed, 7 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
index 88ff3d49a7..8d7a5ba59f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala
@@ -169,7 +169,7 @@ case class LeftSemiJoinHash(
def execute() = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
- val hashTable = new java.util.HashSet[Row]()
+ val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
// Create a Hash set of buildKeys
@@ -177,43 +177,17 @@ case class LeftSemiJoinHash(
currentRow = buildIter.next()
val rowKey = buildSideKeyGenerator(currentRow)
if(!rowKey.anyNull) {
- val keyExists = hashTable.contains(rowKey)
+ val keyExists = hashSet.contains(rowKey)
if (!keyExists) {
- hashTable.add(rowKey)
+ hashSet.add(rowKey)
}
}
}
- new Iterator[Row] {
- private[this] var currentStreamedRow: Row = _
- private[this] var currentHashMatched: Boolean = false
-
- private[this] val joinKeys = streamSideKeyGenerator()
-
- override final def hasNext: Boolean =
- streamIter.hasNext && fetchNext()
-
- override final def next() = {
- currentStreamedRow
- }
-
- /**
- * Searches the streamed iterator for the next row that has at least one match in hashtable.
- *
- * @return true if the search is successful, and false the streamed iterator runs out of
- * tuples.
- */
- private final def fetchNext(): Boolean = {
- currentHashMatched = false
- while (!currentHashMatched && streamIter.hasNext) {
- currentStreamedRow = streamIter.next()
- if (!joinKeys(currentStreamedRow).anyNull) {
- currentHashMatched = hashTable.contains(joinKeys.currentValue)
- }
- }
- currentHashMatched
- }
- }
+ val joinKeys = streamSideKeyGenerator()
+ streamIter.filter(current => {
+ !joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)
+ })
}
}
}